AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
.map(record -> new Record(recordNumber.incrementAndGet(), record))
.parallel()
.filter(record -> doSomeOperation())
.findFirst()
Quand j'ai écrit ceci, j'ai supposé que les threads ne seraient engendrés que par l'appel de carte car le parallèle est placé après la carte. Mais certaines lignes du fichier obtenaient des numéros d'enregistrement différents pour chaque exécution.
J'ai lu la documentation officielle des flux Java et quelques sites Web pour comprendre comment les flux fonctionnent sous le capot.
Quelques questions:
Le flux parallèle Java fonctionne sur la base de SplitIterator , qui est implémenté par chaque collection comme ArrayList, LinkedList etc. Lorsque nous construisons un flux parallèle à partir de ces collections, l'itérateur de fractionnement correspondant sera utilisé pour fractionner et itérer la collection. Cela explique pourquoi le parallélisme s'est produit au niveau de la source d'entrée d'origine (lignes de fichier) plutôt qu'au niveau du résultat de la carte (c'est-à-dire Record pojo). Ma compréhension est-elle correcte?
Dans mon cas, l'entrée est un flux d'E / S de fichiers. Quel itérateur divisé sera utilisé?
Peu importe où nous nous situons
parallel()
dans le pipeline. La source d'entrée d'origine sera toujours divisée et les opérations intermédiaires restantes seront appliquées.Dans ce cas, Java ne devrait pas permettre aux utilisateurs de placer des opérations parallèles n'importe où dans le pipeline, sauf à la source d'origine. Parce que cela donne une mauvaise compréhension à ceux qui ne savent pas comment Java Stream fonctionne en interne. Je sais que l'
parallel()
opération aurait été définie pour le type d'objet Stream et donc, cela fonctionne de cette façon. Mais, il est préférable de fournir une autre solution.Dans l'extrait de code ci-dessus, j'essaie d'ajouter un numéro de ligne à chaque enregistrement du fichier d'entrée et il doit donc être commandé. Cependant, je veux appliquer
doSomeOperation()
en parallèle car c'est une logique lourde. La seule façon d'y parvenir est d'écrire mon propre itérateur divisé personnalisé. Est-ce qu'il y a un autre moyen?
la source
parallel()
n'est rien de plus qu'une demande de modification générale qui est appliquée à l'objet de flux sous-jacent. N'oubliez pas qu'il n'y a qu'un seul flux source si vous n'appliquez pas d'opérations finales au tube, c'est-à-dire tant que rien n'est "exécuté". Cela dit, vous ne faites que remettre en question les choix de conception Java. Ce qui est basé sur l'opinion et nous ne pouvons pas vraiment aider à cela.Stream
directement dans l' interface et en raison de la mise en cascade agréable, chaque opération est redistribuéeStream
. Imaginez que quelqu'un veuille vous donnerStream
mais a déjà appliqué quelques opérations commemap
celle-ci. En tant qu'utilisateur, vous voulez toujours pouvoir décider de l'exécuter en parallèle ou non. Il doit donc être possible d'appelerparallel()
encore, bien que le flux existe déjà.flatMap
ou si vous exécutez des méthodes thread-unsafe ou similaires.Path
est sur le système de fichiers local et que vous utilisez un JDK récent, le séparateur aura une meilleure capacité de traitement parallèle que les multiples de 1024 par lots. Mais le fractionnement équilibré peut même être contre-productif dans certainsfindFirst
scénarios…Réponses:
Le flux entier est parallèle ou séquentiel. Nous ne sélectionnons pas un sous-ensemble d'opérations à exécuter séquentiellement ou en parallèle.
Comme vous le mentionnez, les flux parallèles utilisent des itérateurs fractionnés. De toute évidence, il s'agit de partitionner les données avant le démarrage des opérations.
En regardant la source, je vois qu'il utilise
java.nio.file.FileChannelLinesSpliterator
Droite. Vous pouvez même appeler
parallel()
et àsequential()
plusieurs reprises. Celui invoqué en dernier l'emportera. Lorsque nous appelonsparallel()
, nous définissons cela pour le flux renvoyé; et comme indiqué ci-dessus, toutes les opérations s'exécutent séquentiellement ou en parallèle.Cela devient une question d'opinions. Je pense que Zabuza donne une bonne raison de soutenir le choix des concepteurs JDK.
Cela dépend de vos opérations
findFirst()
c'est votre véritable opération de terminal, vous n'avez même pas à vous soucier de l'exécution parallèle, car il n'y aura pas beaucoup d'appels dedoSomething()
toute façon (findFirst()
c'est un court-circuit)..parallel()
en fait, cela peut entraîner le traitement de plusieurs éléments, alors quefindFirst()
sur un flux séquentiel, cela empêcherait cela.Si votre opération de terminal ne crée pas beaucoup de données, vous pouvez peut-être créer vos
Record
objets à l'aide d'un flux séquentiel, puis traiter le résultat en parallèle:Si votre pipeline charge beaucoup de données en mémoire (ce qui peut être la raison de votre utilisation
Files.lines()
), vous aurez peut-être besoin d'un itérateur de fractionnement personnalisé. Avant d'y aller, cependant, j'examinerais d'autres options (telles que l'enregistrement des lignes avec une colonne id pour commencer - c'est juste mon avis).J'essaierais également de traiter des enregistrements en lots plus petits, comme ceci:
Cela s'exécute
doSomeOperation()
en parallèle sans charger toutes les données en mémoire. Mais notez qu'ilbatchSize
faudra réfléchir.la source
Spliterator
implémentation personnalisée ne serait pas plus compliquée que cela, tout en permettant un traitement parallèle plus efficace…parallelStream
opérations internes a une surcharge fixe pour lancer l'opération et attendre le résultat final, tout en étant limitée à un parallélisme debatchSize
. Tout d'abord, vous avez besoin d'un multiple du nombre actuellement disponible de cœurs de processeur pour éviter les threads inactifs. Ensuite, le nombre doit être suffisamment élevé pour compenser la surcharge fixe, mais plus le nombre est élevé, plus la pause imposée par l'opération de lecture séquentielle se produit avant même le début du traitement parallèle.Stream.generate
produit un flux non ordonné, qui ne fonctionne pas avec les cas d'utilisation prévus de l'OP commefindFirst()
. En revanche, un seul flux parallèle avec un séparateur qui renvoie les morceaux dans lestrySplit
œuvres directement et permet aux threads de travail de traiter le prochain morceau sans attendre la fin du précédent.findFirst()
opération ne traitera qu'un petit nombre d'éléments. La première correspondance peut toujours se produire après le traitement de 90% de tous les éléments. De plus, lorsque vous avez dix millions de lignes, même trouver une correspondance après 10% nécessite toujours de traiter un million de lignes.La conception originale de Stream incluait l'idée de prendre en charge les étapes de pipeline suivantes avec différents paramètres d'exécution parallèle, mais cette idée a été abandonnée. L'API peut provenir de cette époque, mais d'un autre côté, une conception d'API qui oblige l'appelant à prendre une seule décision non ambiguë pour une exécution parallèle ou séquentielle serait beaucoup plus compliquée.
Le réel
Spliterator
utilisé parFiles.lines(…)
dépend de l'implémentation. En Java 8 (Oracle ou OpenJDK), vous obtenez toujours la même chose qu'avecBufferedReader.lines()
. Dans les JDK plus récents, si lePath
appartient au système de fichiers par défaut et que le jeu de caractères est l'un des pris en charge pour cette fonctionnalité, vous obtenez un Stream avec uneSpliterator
implémentation dédiée , lejava.nio.file.FileChannelLinesSpliterator
. Si les conditions préalables ne sont pas remplies, vous obtenez la même chose qu'avecBufferedReader.lines()
, qui est toujours basé sur unIterator
implémenté dansBufferedReader
et enveloppé viaSpliterators.spliteratorUnknownSize
.Votre tâche spécifique est mieux gérée avec une personnalisation
Spliterator
qui peut effectuer la numérotation des lignes directement à la source, avant le traitement parallèle, pour permettre un traitement parallèle ultérieur sans restrictions.la source
Et ce qui suit est une simple démonstration de l'application du parallèle. La sortie de peek montre clairement la différence entre les deux exemples. Remarque: l'
map
appel est simplement lancé pour ajouter une autre méthode avantparallel
.la source