Éviter les fuites de mémoire avec Scalaz 7 zipWithIndex / group enumeratees

106

Contexte

Comme indiqué dans cette question , j'utilise Scalaz 7 iteratees pour traiter un grand flux de données (c'est-à-dire illimité) dans un espace de tas constant.

Mon code ressemble à ceci:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]

def processChunk(c: Chunk, idx: Long): Result

def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
  Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
    rs ++ vs map { 
      case (c, i) => processChunk(c, i) 
    }
  } &= (data.zipWithIndex mapE Iteratee.group(P))

Le problème

Je semble avoir rencontré une fuite de mémoire, mais je ne connais pas assez Scalaz / FP pour savoir si le bogue est dans Scalaz ou dans mon code. Intuitivement, je m'attends à ce que ce code ne nécessite que (de l'ordre de) P fois l' Chunkespace -size.

Remarque: j'ai trouvé une question similaire dans laquelle un a OutOfMemoryErrorété rencontré, mais mon code n'utilise pas consume.

Essai

J'ai fait quelques tests pour essayer d'isoler le problème. Pour résumer, la fuite ne semble se poser lorsque les deux zipWithIndexet groupsont utilisés.

// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296

// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296

// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space

// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296

// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184

// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184

Code des tests:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._

// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) = 
  Iteratee.enumIterator[Array[Int], IO](
    Iterator.continually(Array.fill(sz)(0)).take(n))

// define an iteratee that consumes a stream of arrays 
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) { 
  (c, a) => c + a.length 
}

// define an iteratee that consumes a grouped stream of arrays 
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) { 
  (c, as) => c + as.map(_.length).sum 
}

// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
  (c, vs) => c + vs.map(_._1.length).sum
}

// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
  (c, v) => c + v._1.length
}

Des questions

  • Le bogue est-il dans mon code?
  • Comment puis-je faire fonctionner cela dans un espace de tas constant?
Aaron Novstrup
la source
6
J'ai fini par signaler cela comme un problème dans Scalaz .
Aaron Novstrup
1
Ce ne sera pas amusant, mais vous pouvez essayer d' -XX:+HeapDumpOnOutOfMemoryErroranalyser le vidage avec eclipse MAT eclipse.org/mat pour voir quelle ligne de code retient les tableaux.
huynhjl
10
@huynhjl FWIW, j'ai essayé d'analyser le tas avec JProfiler et MAT mais j'étais complètement incapable de parcourir toutes les références aux classes de fonctions anonymes, etc. Scala a vraiment besoin d'outils dédiés pour ce genre de chose.
Aaron Novstrup
Que se passe-t-il s'il n'y a pas de fuite et que ce que vous faites nécessite une quantité de mémoire considérablement croissante? Vous pouvez facilement répliquer le zipWithIndex sans cette construction FP particulière en maintenant simplement un varcompteur au fur et à mesure.
Ezekiel Victor
@EzekielVictor Je ne suis pas sûr de comprendre le commentaire. Vous suggérez que l'ajout d'un seul Longindex par bloc changerait l'algorithme d'un espace de tas constant à non constant? La version sans compression utilise clairement un espace de tas constant, car elle peut "traiter" autant de morceaux que vous êtes prêt à attendre.
Aaron Novstrup

Réponses:

4

Cela ne sera que peu de consolation pour quiconque est bloqué avec l'ancienne iterateeAPI, mais j'ai récemment vérifié qu'un test équivalent réussit avec l' API scalaz-stream . Il s'agit d'une API de traitement de flux plus récente destinée à remplacer iteratee.

Pour être complet, voici le code de test:

// create a stream containing `n` arrays with `sz` Ints in each one
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
  (Process emit Array.fill(sz)(0)).repeat take n

(streamArrs(1 << 25, 1 << 14).zipWithIndex 
      pipe process1.chunk(4) 
      pipe process1.fold(0L) {
    (c, vs) => c + vs.map(_._1.length.toLong).sum
  }).runLast.run

Cela devrait fonctionner avec n'importe quelle valeur pour le nparamètre (à condition que vous soyez prêt à attendre suffisamment longtemps) - J'ai testé avec 2 ^ 14 tableaux de 32 Mo (soit un total d'un demi-Tio de mémoire allouée dans le temps).

Aaron Novstrup
la source