Contexte
Comme indiqué dans cette question , j'utilise Scalaz 7 iteratees pour traiter un grand flux de données (c'est-à-dire illimité) dans un espace de tas constant.
Mon code ressemble à ceci:
type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]
def processChunk(c: Chunk, idx: Long): Result
def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
rs ++ vs map {
case (c, i) => processChunk(c, i)
}
} &= (data.zipWithIndex mapE Iteratee.group(P))
Le problème
Je semble avoir rencontré une fuite de mémoire, mais je ne connais pas assez Scalaz / FP pour savoir si le bogue est dans Scalaz ou dans mon code. Intuitivement, je m'attends à ce que ce code ne nécessite que (de l'ordre de) P fois l' Chunk
espace -size.
Remarque: j'ai trouvé une question similaire dans laquelle un a OutOfMemoryError
été rencontré, mais mon code n'utilise pas consume
.
Essai
J'ai fait quelques tests pour essayer d'isoler le problème. Pour résumer, la fuite ne semble se poser lorsque les deux zipWithIndex
et group
sont utilisés.
// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296
// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296
// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space
// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296
// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184
// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184
Code des tests:
import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._
// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) =
Iteratee.enumIterator[Array[Int], IO](
Iterator.continually(Array.fill(sz)(0)).take(n))
// define an iteratee that consumes a stream of arrays
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) {
(c, a) => c + a.length
}
// define an iteratee that consumes a grouped stream of arrays
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) {
(c, as) => c + as.map(_.length).sum
}
// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
(c, vs) => c + vs.map(_._1.length).sum
}
// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
(c, v) => c + v._1.length
}
Des questions
- Le bogue est-il dans mon code?
- Comment puis-je faire fonctionner cela dans un espace de tas constant?
-XX:+HeapDumpOnOutOfMemoryError
analyser le vidage avec eclipse MAT eclipse.org/mat pour voir quelle ligne de code retient les tableaux.var
compteur au fur et à mesure.Long
index par bloc changerait l'algorithme d'un espace de tas constant à non constant? La version sans compression utilise clairement un espace de tas constant, car elle peut "traiter" autant de morceaux que vous êtes prêt à attendre.Réponses:
Cela ne sera que peu de consolation pour quiconque est bloqué avec l'ancienne
iteratee
API, mais j'ai récemment vérifié qu'un test équivalent réussit avec l' API scalaz-stream . Il s'agit d'une API de traitement de flux plus récente destinée à remplaceriteratee
.Pour être complet, voici le code de test:
Cela devrait fonctionner avec n'importe quelle valeur pour le
n
paramètre (à condition que vous soyez prêt à attendre suffisamment longtemps) - J'ai testé avec 2 ^ 14 tableaux de 32 Mo (soit un total d'un demi-Tio de mémoire allouée dans le temps).la source