Je voudrais dupliquer un flux Java 8 pour pouvoir le gérer deux fois. Je peux collect
comme liste et obtenir de nouveaux flux à partir de cela;
// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff
Mais je pense qu'il devrait y avoir un moyen plus efficace / élégant.
Existe-t-il un moyen de copier le flux sans le transformer en collection?
Je travaille en fait avec un flux de Either
s, je veux donc traiter la projection de gauche dans un sens avant de passer à la projection de droite et de traiter cette autre manière. Un peu comme ça (avec lequel, jusqu'à présent, je suis obligé d'utiliser l' toList
astuce).
List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());
Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );
Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
java
lambda
java-8
java-stream
Toby
la source
la source
Réponses:
Je pense que votre hypothèse sur l'efficacité est un peu rétrograde. Vous obtenez ce gain d'efficacité énorme si vous n'utilisez les données qu'une seule fois, car vous n'avez pas à les stocker, et les flux vous offrent de puissantes optimisations de «fusion en boucle» qui vous permettent de faire circuler efficacement toutes les données dans le pipeline.
Si vous souhaitez réutiliser les mêmes données, vous devez par définition les générer deux fois (de manière déterministe) ou les stocker. S'il se trouve déjà dans une collection, tant mieux; puis l'itérer deux fois n'est pas cher.
Nous avons expérimenté dans la conception avec des «flux fourchus». Ce que nous avons constaté, c'est que soutenir cela avait des coûts réels; il alourdissait le cas courant (utilisation unique) aux dépens du cas rare. Le gros problème était de savoir «ce qui se passe lorsque les deux pipelines ne consomment pas de données au même rythme». De toute façon, vous revenez à la mise en mémoire tampon. C'était une caractéristique qui ne portait clairement pas son poids.
Si vous souhaitez utiliser les mêmes données à plusieurs reprises, stockez-les ou structurez vos opérations en consommateurs et procédez comme suit:
Vous pouvez également regarder dans la bibliothèque RxJava, car son modèle de traitement se prête mieux à ce type de "stream forking".
la source
toList
) pour pouvoir les traiter (leEither
cas étant l'exemple)?Vous pouvez utiliser une variable locale avec un
Supplier
pour configurer les parties communes du pipeline de flux.De http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/ :
la source
Supplier
si leStream
est construit de manière «coûteuse», vous payez ce coût pour chaque appel àSupplier.get()
. ie si une requête de base de données ... cette requête est faite à chaque foisSet<Integer>
utilisationcollect(Collectors.toSet())
... et faire quelques opérations là-dessus. Je voulaismax()
et si une valeur spécifique était définie en deux opérations ...filter(d -> d == -1).count() == 1;
Utilisez a
Supplier
pour produire le flux pour chaque opération de terminaison.Chaque fois que vous avez besoin d'un flux de cette collection, utilisez
streamSupplier.get()
pour obtenir un nouveau flux.Exemples:
streamSupplier.get().anyMatch(predicate);
streamSupplier.get().allMatch(predicate2);
la source
Nous avons implémenté une
duplicate()
méthode pour les flux dans jOOλ , une bibliothèque Open Source que nous avons créée pour améliorer les tests d'intégration pour jOOQ . Essentiellement, vous pouvez simplement écrire:En interne, il existe un tampon stockant toutes les valeurs qui ont été consommées à partir d'un flux mais pas de l'autre. C'est probablement aussi efficace que si vos deux flux sont consommés à peu près au même rythme, et si vous pouvez vivre avec le manque de sécurité des threads .
Voici comment fonctionne l'algorithme:
Plus de code source ici
Tuple2
est probablement comme votrePair
type alorsSeq
estStream
avec quelques améliorations.la source
Tuple2<Seq<A>>, Seq<A>> t = duplicate(stream); long count = t.collect(counting()); List<A> list = t.collect(toList());
, c'est mieux de le faireTuple2<Long, List<A>> t = stream.collect(Tuple.collectors(counting(), toList()));
. L'utilisation de l'Collectors.mapping/reducing
un peut exprimer d'autres opérations de flux comme des collecteurs et des éléments de processus de manière assez différente, créant un seul tuple résultant. Donc, en général, vous pouvez faire beaucoup de choses en consommant le flux une fois sans duplication et il sera compatible avec le parallèle.offer()
/poll()
, mais celaArrayDeque
pourrait faire la même chose.Vous pouvez créer un flux d'exécutables (par exemple):
Où
failure
etsuccess
sont les opérations à appliquer. Cela créera cependant un certain nombre d'objets temporaires et ne sera peut-être pas plus efficace que de partir d'une collection et de la diffuser / itérer deux fois.la source
Une autre façon de gérer les éléments plusieurs fois consiste à utiliser Stream.peek (Consumer) :
peek(Consumer)
peut être enchaîné autant de fois que nécessaire.la source
cyclops-react , une bibliothèque à laquelle je contribue, a une méthode statique qui vous permettra de dupliquer un Stream (et renvoie un jOOλ Tuple of Streams).
Voir les commentaires, il y a une pénalité de performance qui sera encourue lors de l'utilisation de dupliquer sur un flux existant. Une alternative plus performante serait d'utiliser Streamable: -
Il existe également une classe Streamable (paresseuse) qui peut être construite à partir d'un Stream, Iterable ou Array et rejouée plusieurs fois.
AsStreamable.synchronizedFromStream (stream) - peut être utilisé pour créer un Streamable qui remplira paresseusement sa collection de sauvegarde, de manière à pouvoir être partagé entre les threads. Streamable.fromStream (stream) n'entraînera aucune surcharge de synchronisation.
la source
List<Integer> list = stream.collect(Collectors.toList()); streams = new Tuple2<>(list.stream(), list.stream())
(comme le suggère OP). Veuillez également indiquer explicitement dans la réponse que vous êtes l'auteur de cyclop-streams. Lisez ceci .Pour ce problème particulier, vous pouvez également utiliser le partitionnement. Quelque chose comme
la source
Nous pouvons utiliser Stream Builder au moment de la lecture ou de l'itération d'un flux. Voici le document de Stream Builder .
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html
Cas d'utilisation
Disons que nous avons un flux d'employés et que nous devons utiliser ce flux pour écrire les données des employés dans un fichier Excel, puis mettre à jour la collection / table des employés [Ceci est juste un cas d'utilisation pour montrer l'utilisation de Stream Builder]:
la source
J'ai eu un problème similaire et je pouvais penser à trois structures intermédiaires différentes à partir desquelles créer une copie du flux: a
List
, un tableau et aStream.Builder
. J'ai écrit un petit programme de référence, qui suggérait que du point de vue de la performance, ilList
était environ 30% plus lent que les deux autres qui étaient assez similaires.Le seul inconvénient de la conversion en tableau est qu'il est délicat si votre type d'élément est un type générique (ce qui dans mon cas c'était); donc je préfère utiliser un
Stream.Builder
.J'ai fini par écrire une petite fonction qui crée un
Collector
:Je peux ensuite faire une copie de n'importe quel flux
str
en faisantstr.collect(copyCollector())
ce qui me semble tout à fait conforme à l'utilisation idiomatique des flux.la source