J'essaie de comprendre pourquoi le programme Java suivant donne un OutOfMemoryError
, tandis que le programme correspondant sans .parallel()
n'en donne pas.
System.out.println(Stream
.iterate(1, i -> i+1)
.parallel()
.flatMap(n -> Stream.iterate(n, i -> i+n))
.mapToInt(Integer::intValue)
.limit(100_000_000)
.sum()
);
J'ai deux questions:
Quelle est la sortie prévue de ce programme?
Sans
.parallel()
cela, cela semble simplementsum(1+2+3+...)
sortir, ce qui signifie qu'il "reste bloqué" au premier flux du flatMap, ce qui est logique.Avec le parallèle, je ne sais pas s'il y a un comportement attendu, mais je suppose qu'il entrelacerait en quelque sorte le premier
n
ou les deux flux, oùn
est le nombre de travailleurs parallèles. Il peut également être légèrement différent en fonction du comportement de segmentation / mise en mémoire tampon.Qu'est-ce qui fait qu'il manque de mémoire? J'essaie spécifiquement de comprendre comment ces flux sont mis en œuvre sous le capot.
Je suppose que quelque chose bloque le flux, donc il ne se termine jamais et est capable de se débarrasser des valeurs générées, mais je ne sais pas exactement dans quel ordre les choses sont évaluées et où la mise en mémoire tampon se produit.
Edit: Au cas où cela serait pertinent, j'utilise Java 11.
Editt 2: Apparemment, la même chose se produit même pour le programme simple IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum()
, donc cela pourrait avoir à voir avec la paresse limit
plutôt que flatMap
.
la source
Réponses:
Vous dites « mais je ne sais pas très bien dans quel ordre les choses sont évaluées et où se produit la mise en mémoire tampon », c'est précisément de cela qu'il s'agit pour les flux parallèles. L'ordre d'évaluation n'est pas spécifié.
Un aspect critique de votre exemple est le
.limit(100_000_000)
. Cela implique que l'implémentation ne peut pas simplement résumer des valeurs arbitraires, mais doit résumer les 100 000 000 premiers nombres. Notez que dans l'implémentation de référence,.unordered().limit(100_000_000)
ne change pas le résultat, ce qui indique qu'il n'y a pas d'implémentation spéciale pour le cas non ordonné, mais c'est un détail d'implémentation.Maintenant, lorsque les threads de travail traitent les éléments, ils ne peuvent pas simplement les résumer, car ils doivent savoir quels éléments ils sont autorisés à consommer, ce qui dépend du nombre d'éléments précédant leur charge de travail spécifique. Étant donné que ce flux ne connaît pas les tailles, cela ne peut être connu que lorsque les éléments préfixes ont été traités, ce qui ne se produit jamais pour les flux infinis. Ainsi, les threads de travail restent en mémoire tampon pour le moment, ces informations deviennent disponibles.
En principe, lorsqu'un thread de travail sait qu'il traite le bloc de travail le plus à gauche¹, il peut résumer les éléments immédiatement, les compter et signaler la fin lorsqu'il atteint la limite. Le Stream pourrait donc se terminer, mais cela dépend de nombreux facteurs.
Dans votre cas, un scénario plausible est que les autres threads de travail affectent plus rapidement les tampons que le travail le plus à gauche ne le compte. Dans ce scénario, des modifications subtiles de la synchronisation peuvent parfois faire revenir le flux avec une valeur.
Lorsque nous ralentissons tous les threads de travail, sauf celui qui traite le segment le plus à gauche, nous pouvons interrompre le flux (au moins dans la plupart des exécutions):
¹ Je suis une suggestion de Stuart Marks d'utiliser l'ordre de gauche à droite pour parler de l'ordre de rencontre plutôt que de l'ordre de traitement.
la source
Files.lines(…)
? Il a été considérablement amélioré dans Java 9.BufferedReader.lines()
dans certaines circonstances (pas le système de fichiers par défaut, un jeu de caractères spécial ou la taille supérieure àInteger.MAX_FILES
). Si l'un d'entre eux s'applique, une solution personnalisée pourrait vous aider. Cela vaudrait un nouveau Q & A…Integer.MAX_VALUE
, bien sûr…Ma meilleure estimation est que l' ajout
parallel()
modifie le comportement interneflatMap()
dont les problèmes déjà eu en cours d' évaluation paresseusement avant .L'
OutOfMemoryError
erreur que vous obtenez a été signalée dans [JDK-8202307] Obtention d'un java.lang.OutOfMemoryError: espace de tas Java lors de l'appel de Stream.iterator (). Next () sur un flux qui utilise un flux infini / très grand dans flatMap . Si vous regardez le ticket, c'est plus ou moins la même trace de pile que vous obtenez. Le ticket a été fermé car il ne sera pas corrigé pour la raison suivante:la source
OOME n'est pas dû au fait que le flux est infini, mais au fait qu'il ne l'est pas .
C'est à dire, si vous commentez le
.limit(...)
, il ne manquera jamais de mémoire - mais bien sûr, il ne finira jamais non plus.Une fois divisé, le flux ne peut garder une trace du nombre d'éléments que s'il est accumulé dans chaque thread (il semble que l'accumulateur réel soit
Spliterators$ArraySpliterator#array
).On dirait que vous pouvez le reproduire sans
flatMap
, exécutez simplement ce qui suit avec-Xmx128m
:Cependant, après avoir commenté le
limit()
, il devrait fonctionner correctement jusqu'à ce que vous décidiez d'épargner votre ordinateur portable.Outre les détails de mise en œuvre réels, voici ce qui se passe à mon avis:
Avec
limit
, lesum
réducteur veut que les premiers éléments X soient résumés, donc aucun thread ne peut émettre de sommes partielles. Chaque "tranche" (fil) devra accumuler des éléments et les traverser. Sans limite, il n'y a pas une telle contrainte, donc chaque "tranche" calculera simplement la somme partielle des éléments qu'elle obtient (pour toujours), en supposant qu'elle émettra le résultat par la suite.la source
parallel()
utilisera enForkJoinPool
interne pour réaliser le parallélisme. LeSpliterator
sera utilisé pour assigner du travail à chaqueForkJoin
tâche, je suppose que nous pouvons appeler l'unité de travail ici comme "fractionnée".Integer.sum()
, utilisé par leIntStream.sum
réducteur. Vous verrez que la version sans limite appelle tout le temps et que la version limitée ne l'appelle jamais avant le MOO.