Les flux Java infinis parallèles manquent de mémoire

16

J'essaie de comprendre pourquoi le programme Java suivant donne un OutOfMemoryError, tandis que le programme correspondant sans .parallel()n'en donne pas.

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

J'ai deux questions:

  1. Quelle est la sortie prévue de ce programme?

    Sans .parallel()cela, cela semble simplement sum(1+2+3+...)sortir, ce qui signifie qu'il "reste bloqué" au premier flux du flatMap, ce qui est logique.

    Avec le parallèle, je ne sais pas s'il y a un comportement attendu, mais je suppose qu'il entrelacerait en quelque sorte le premier nou les deux flux, où nest le nombre de travailleurs parallèles. Il peut également être légèrement différent en fonction du comportement de segmentation / mise en mémoire tampon.

  2. Qu'est-ce qui fait qu'il manque de mémoire? J'essaie spécifiquement de comprendre comment ces flux sont mis en œuvre sous le capot.

    Je suppose que quelque chose bloque le flux, donc il ne se termine jamais et est capable de se débarrasser des valeurs générées, mais je ne sais pas exactement dans quel ordre les choses sont évaluées et où la mise en mémoire tampon se produit.

Edit: Au cas où cela serait pertinent, j'utilise Java 11.

Editt 2: Apparemment, la même chose se produit même pour le programme simple IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum(), donc cela pourrait avoir à voir avec la paresse limitplutôt que flatMap.

Thomas Ahle
la source
parallel () utilise en interne ForkJoinPool. Je suppose que ForkJoin Framework est en Java à partir de Java 7
aravind

Réponses:

9

Vous dites « mais je ne sais pas très bien dans quel ordre les choses sont évaluées et où se produit la mise en mémoire tampon », c'est précisément de cela qu'il s'agit pour les flux parallèles. L'ordre d'évaluation n'est pas spécifié.

Un aspect critique de votre exemple est le .limit(100_000_000). Cela implique que l'implémentation ne peut pas simplement résumer des valeurs arbitraires, mais doit résumer les 100 000 000 premiers nombres. Notez que dans l'implémentation de référence, .unordered().limit(100_000_000)ne change pas le résultat, ce qui indique qu'il n'y a pas d'implémentation spéciale pour le cas non ordonné, mais c'est un détail d'implémentation.

Maintenant, lorsque les threads de travail traitent les éléments, ils ne peuvent pas simplement les résumer, car ils doivent savoir quels éléments ils sont autorisés à consommer, ce qui dépend du nombre d'éléments précédant leur charge de travail spécifique. Étant donné que ce flux ne connaît pas les tailles, cela ne peut être connu que lorsque les éléments préfixes ont été traités, ce qui ne se produit jamais pour les flux infinis. Ainsi, les threads de travail restent en mémoire tampon pour le moment, ces informations deviennent disponibles.

En principe, lorsqu'un thread de travail sait qu'il traite le bloc de travail le plus à gauche¹, il peut résumer les éléments immédiatement, les compter et signaler la fin lorsqu'il atteint la limite. Le Stream pourrait donc se terminer, mais cela dépend de nombreux facteurs.

Dans votre cas, un scénario plausible est que les autres threads de travail affectent plus rapidement les tampons que le travail le plus à gauche ne le compte. Dans ce scénario, des modifications subtiles de la synchronisation peuvent parfois faire revenir le flux avec une valeur.

Lorsque nous ralentissons tous les threads de travail, sauf celui qui traite le segment le plus à gauche, nous pouvons interrompre le flux (au moins dans la plupart des exécutions):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

¹ Je suis une suggestion de Stuart Marks d'utiliser l'ordre de gauche à droite pour parler de l'ordre de rencontre plutôt que de l'ordre de traitement.

Holger
la source
Très belle réponse! Je me demande s'il y a même un risque que tous les threads commencent à exécuter les opérations flatMap, et aucun ne soit alloué pour vider réellement les tampons (sommation)? Dans mon cas d'utilisation réel, les flux infinis sont plutôt des fichiers trop volumineux pour être conservés en mémoire. Je me demande comment je peux réécrire le flux pour limiter l'utilisation de la mémoire?
Thomas Ahle
1
Utilisez-vous Files.lines(…)? Il a été considérablement amélioré dans Java 9.
Holger
1
C'est ce qu'il fait dans Java 8. Dans les JRE plus récents, il restera BufferedReader.lines()dans certaines circonstances (pas le système de fichiers par défaut, un jeu de caractères spécial ou la taille supérieure à Integer.MAX_FILES). Si l'un d'entre eux s'applique, une solution personnalisée pourrait vous aider. Cela vaudrait un nouveau Q & A…
Holger
1
Integer.MAX_VALUE, bien sûr…
Holger
1
Qu'est-ce que le flux externe, un flux de fichiers? At-il une taille prévisible?
Holger
5

Ma meilleure estimation est que l' ajout parallel()modifie le comportement interne flatMap()dont les problèmes déjà eu en cours d' évaluation paresseusement avant .

L' OutOfMemoryErrorerreur que vous obtenez a été signalée dans [JDK-8202307] Obtention d'un java.lang.OutOfMemoryError: espace de tas Java lors de l'appel de Stream.iterator (). Next () sur un flux qui utilise un flux infini / très grand dans flatMap . Si vous regardez le ticket, c'est plus ou moins la même trace de pile que vous obtenez. Le ticket a été fermé car il ne sera pas corrigé pour la raison suivante:

Les méthodes iterator()et spliterator()sont des "hachures d'échappement" à utiliser lorsqu'il n'est pas possible d'utiliser d'autres opérations. Ils ont certaines limites car ils transforment ce qui est un modèle push de l'implémentation de flux en un modèle pull. Une telle transition nécessite une mise en mémoire tampon dans certains cas, par exemple lorsqu'un élément est (plat) mappé sur deux éléments ou plus . Cela compliquerait considérablement la mise en œuvre du flux, probablement au détriment des cas courants, pour soutenir une notion de contre-pression pour communiquer le nombre d'éléments à tirer à travers les couches imbriquées de production d'éléments.

Karol Dowbecki
la source
C'est très intéressant! Il est logique que la transition push / pull nécessite une mise en mémoire tampon qui peut utiliser la mémoire. Cependant, dans mon cas, il semble que l'utilisation de simplement pousser devrait fonctionner correctement et simplement jeter les éléments restants tels qu'ils apparaissent? Ou peut-être vous dites que flapmap provoque la création d'un itérateur?
Thomas Ahle
3

OOME n'est pas dû au fait que le flux est infini, mais au fait qu'il ne l'est pas .

C'est à dire, si vous commentez le .limit(...), il ne manquera jamais de mémoire - mais bien sûr, il ne finira jamais non plus.

Une fois divisé, le flux ne peut garder une trace du nombre d'éléments que s'il est accumulé dans chaque thread (il semble que l'accumulateur réel soit Spliterators$ArraySpliterator#array ).

On dirait que vous pouvez le reproduire sans flatMap, exécutez simplement ce qui suit avec -Xmx128m:

    System.out.println(Stream
            .iterate(1, i -> i + 1)
            .parallel()
      //    .flatMap(n -> Stream.iterate(n, i -> i+n))
            .mapToInt(Integer::intValue)
            .limit(100_000_000)
            .sum()
    );

Cependant, après avoir commenté le limit(), il devrait fonctionner correctement jusqu'à ce que vous décidiez d'épargner votre ordinateur portable.

Outre les détails de mise en œuvre réels, voici ce qui se passe à mon avis:

Avec limit, le sumréducteur veut que les premiers éléments X soient résumés, donc aucun thread ne peut émettre de sommes partielles. Chaque "tranche" (fil) devra accumuler des éléments et les traverser. Sans limite, il n'y a pas une telle contrainte, donc chaque "tranche" calculera simplement la somme partielle des éléments qu'elle obtient (pour toujours), en supposant qu'elle émettra le résultat par la suite.

Costi Ciudatu
la source
Que voulez-vous dire "une fois qu'il est divisé"? Est-ce que la limite la divise en quelque sorte?
Thomas Ahle
@ThomasAhle parallel()utilisera en ForkJoinPoolinterne pour réaliser le parallélisme. Le Spliteratorsera utilisé pour assigner du travail à chaque ForkJointâche, je suppose que nous pouvons appeler l'unité de travail ici comme "fractionnée".
Karol Dowbecki
Mais pourquoi cela ne se produit-il qu'avec une limite?
Thomas Ahle
@ThomasAhle J'ai édité la réponse avec mes deux cents.
Costi Ciudatu
1
@ThomasAhle a défini un point d'arrêt dans Integer.sum(), utilisé par le IntStream.sumréducteur. Vous verrez que la version sans limite appelle tout le temps et que la version limitée ne l'appelle jamais avant le MOO.
Costi Ciudatu