J'ai un gros fichier qui contient une liste d'articles.
Je voudrais créer un lot d'articles, faire une requête HTTP avec ce lot (tous les éléments sont nécessaires en tant que paramètres dans la requête HTTP). Je peux le faire très facilement avec une for
boucle, mais en tant qu'amoureux de Java 8, je veux essayer d'écrire ceci avec le framework Stream de Java 8 (et profiter des avantages du traitement paresseux).
Exemple:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
Je veux faire quelque chose d'une longue lignée de
lazyFileStream.group(500).map(processBatch).collect(toList())
Quelle serait la meilleure façon de faire cela?
java
java-8
batch-processing
java-stream
Andy Dang
la source
la source
flatMap
(+ un flatMap supplémentaire pour réduire à nouveau les flux)? Je ne pense pas que quelque chose comme ça existe comme méthode pratique dans la bibliothèque standard. Soit vous devrez trouver une bibliothèque tierce, soit écrire la vôtre basée sur des séparateurs et / ou un collecteur émettant un flux de fluxStream.generate
avecreader::readLine
etlimit
, mais le problème est que les flux ne fonctionnent pas bien avec les exceptions. En outre, ce n'est probablement pas bien parallélisable. Je pense que lafor
boucle est toujours la meilleure option.Réponses:
Remarque! Cette solution lit l'intégralité du fichier avant d'exécuter forEach.
Vous pouvez le faire avec jOOλ , une bibliothèque qui étend les flux Java 8 pour les cas d'utilisation de flux séquentiels à un seul thread:
Dans les coulisses,
zipWithIndex()
c'est juste:... alors que l'
groupBy()
API est pratique pour:(Avertissement: je travaille pour l'entreprise derrière jOOλ)
la source
Map
(contrairement, par exemple, à la solution Ben Manes)Pour être complet, voici une solution Guava .
Dans la question, la collection est disponible, donc un flux n'est pas nécessaire et il peut être écrit comme,
la source
Lists.partition
est une autre variante que j'aurais dû mentionner.Stream
en mémoire avant de traiter le lot concernébatchSize
éléments par itération.L'implémentation Pure Java-8 est également possible:
Notez que contrairement à JOOl, il peut bien fonctionner en parallèle (à condition que vous soyez
data
une liste d'accès aléatoire).la source
List
(voirdata.size()
,data.get()
dans la question). Je réponds à la question posée. Si vous avez une autre question, posez-la à la place (même si je pense que la question du flux a également déjà été posée).Solution Pure Java 8 :
Nous pouvons créer un collecteur personnalisé pour le faire avec élégance, qui prend en a
batch size
et aConsumer
pour traiter chaque lot:Si vous le souhaitez, créez une classe d'utilitaire d'assistance:
Exemple d'utilisation:
J'ai également publié mon code sur GitHub, si quelqu'un veut y jeter un œil:
Lien vers Github
la source
J'ai écrit un Spliterator personnalisé pour des scénarios comme celui-ci. Il remplira les listes d'une taille donnée à partir du flux d'entrée. L'avantage de cette approche est qu'elle effectuera un traitement paresseux et qu'elle fonctionnera avec d'autres fonctions de flux.
la source
SUBSIZED
le fractionnement renvoyé partrySplit
peut avoir plus d'éléments qu'avant le fractionnement (si le fractionnement se produit au milieu du lot).Spliterators
est correcte, alorstrySplit
doit toujours partitionner les données en deux parties à peu près égales afin que le résultat ne soit jamais plus grand que l'original?if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
Nous avions un problème similaire à résoudre. Nous voulions prendre un flux qui était plus grand que la mémoire système (itérer à travers tous les objets d'une base de données) et randomiser l'ordre le mieux possible - nous avons pensé qu'il serait correct de mettre en mémoire tampon 10000 éléments et de les randomiser.
La cible était une fonction qui prenait un flux.
Parmi les solutions proposées ici, il semble y avoir une gamme d'options:
Notre instinct était à l'origine d'utiliser un collecteur personnalisé, mais cela signifiait abandonner le streaming. La solution de collecteur personnalisé ci-dessus est très bonne et nous l'avons presque utilisée.
Voici une solution qui triche en utilisant le fait que
Stream
s peut vous donner unIterator
que vous pouvez utiliser comme trappe d'échappement pour vous permettre de faire quelque chose de plus que les flux ne prennent pas en charge. LeIterator
est reconverti en un flux en utilisant un autre peu deStreamSupport
sorcellerie Java 8 .Un exemple simple d'utilisation de ceci ressemblerait à ceci:
Les impressions ci-dessus
Pour notre cas d'utilisation, nous voulions mélanger les lots, puis les conserver sous forme de flux - cela ressemblait à ceci:
Cela produit quelque chose comme (c'est aléatoire, si différent à chaque fois)
La sauce secrète ici est qu'il y a toujours un flux, vous pouvez donc soit opérer sur un flux de lots, soit faire quelque chose pour chaque lot, puis
flatMap
le retourner à un flux. Mieux encore, tous les passe au- dessus que la finaleforEach
oucollect
ou d' autres expressions de terminaison PULL les données à travers le flux.Il s'avère qu'il
iterator
s'agit d'un type spécial d' opération de terminaison sur un flux et ne provoque pas l'exécution et la mise en mémoire de l'ensemble du flux! Merci aux gars de Java 8 pour un design brillant!la source
List
vous persistiez à un - vous ne pouvez pas différer l'itération des éléments intra-lot car le consommateur voudra peut-être ignorer un lot entier, et si vous n'avez pas consommé le éléments alors ils ne sauteraient pas très loin. (J'ai implémenté l'un d'entre eux en C #, même si c'était beaucoup plus facile.)Vous pouvez également utiliser RxJava :
ou
ou
la source
Vous pouvez également jeter un œil à cyclops-react , je suis l'auteur de cette bibliothèque. Il implémente l'interface jOOλ (et par extension JDK 8 Streams), mais contrairement aux JDK 8 Parallel Streams, il se concentre sur les opérations asynchrones (telles que le blocage potentiel des appels d'E / S Async). JDK Parallel Streams, en revanche, se concentre sur le parallélisme des données pour les opérations liées au processeur. Il fonctionne en gérant des agrégats de tâches futures sous le capot, mais présente une API Stream étendue standard aux utilisateurs finaux.
Cet exemple de code peut vous aider à démarrer
Il y a un tutoriel sur le batching ici
Et un tutoriel plus général ici
Pour utiliser votre propre Thread Pool (qui est probablement plus approprié pour bloquer les E / S), vous pouvez commencer le traitement avec
la source
Exemple pur Java 8 qui fonctionne également avec des flux parallèles.
Comment utiliser:
La déclaration et l'implémentation de la méthode:
la source
En toute honnêteté, jetez un œil à l'élégante solution Vavr :
la source
Exemple simple utilisant Spliterator
La réponse de Bruce est plus complète, mais je cherchais quelque chose de rapide et sale pour traiter un tas de fichiers.
la source
c'est une solution java pure qui est évaluée paresseusement.
la source
Vous pouvez utiliser apache.commons:
La partie partitionnement se fait sans paresseux, mais une fois la liste partitionnée, vous bénéficiez des avantages de travailler avec des flux (par exemple, utilisez des flux parallèles, ajoutez des filtres, etc.). D'autres réponses suggèrent des solutions plus élaborées mais parfois la lisibilité et la maintenabilité sont plus importantes (et parfois elles ne le sont pas :-))
la source
Cela pourrait être facilement fait en utilisant Reactor :
la source
Avec
Java 8
etcom.google.common.collect.Lists
, vous pouvez faire quelque chose comme:Voici
T
le type des éléments dans la liste d'entrée etU
le type des éléments dans la liste de sortieEt vous pouvez l'utiliser comme ceci:
la source