Java 8 Stream avec traitement par lots

95

J'ai un gros fichier qui contient une liste d'articles.

Je voudrais créer un lot d'articles, faire une requête HTTP avec ce lot (tous les éléments sont nécessaires en tant que paramètres dans la requête HTTP). Je peux le faire très facilement avec une forboucle, mais en tant qu'amoureux de Java 8, je veux essayer d'écrire ceci avec le framework Stream de Java 8 (et profiter des avantages du traitement paresseux).

Exemple:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

Je veux faire quelque chose d'une longue lignée de lazyFileStream.group(500).map(processBatch).collect(toList())

Quelle serait la meilleure façon de faire cela?

Andy Dang
la source
Je ne peux pas vraiment comprendre comment effectuer le regroupement, désolé, mais Files # lines lira paresseusement le contenu du fichier.
Toby le
1
vous avez donc essentiellement besoin d'un inverse de flatMap(+ un flatMap supplémentaire pour réduire à nouveau les flux)? Je ne pense pas que quelque chose comme ça existe comme méthode pratique dans la bibliothèque standard. Soit vous devrez trouver une bibliothèque tierce, soit écrire la vôtre basée sur des séparateurs et / ou un collecteur émettant un flux de flux
the8472
3
Peut-être que vous pouvez combiner Stream.generateavec reader::readLineet limit, mais le problème est que les flux ne fonctionnent pas bien avec les exceptions. En outre, ce n'est probablement pas bien parallélisable. Je pense que la forboucle est toujours la meilleure option.
tobias_k
Je viens d'ajouter un exemple de code. Je ne pense pas que flatMap soit la solution. Je soupçonne que je pourrais avoir à écrire un Spliterator personnalisé
Andy Dang
1
J'invente le terme «abus de flux» pour des questions comme celle-ci.
kervin le

Réponses:

13

Remarque! Cette solution lit l'intégralité du fichier avant d'exécuter forEach.

Vous pouvez le faire avec jOOλ , une bibliothèque qui étend les flux Java 8 pour les cas d'utilisation de flux séquentiels à un seul thread:

Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> {
       process(batch);
   });

Dans les coulisses, zipWithIndex()c'est juste:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
    final Iterator<T> it = stream.iterator();

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
        long index;

        @Override
        public boolean hasNext() {
            return it.hasNext();
        }

        @Override
        public Tuple2<T, Long> next() {
            return tuple(it.next(), index++);
        }
    }

    return seq(new ZipWithIndex());
}

... alors que l' groupBy()API est pratique pour:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
    return collect(Collectors.groupingBy(classifier));
}

(Avertissement: je travaille pour l'entreprise derrière jOOλ)

Lukas Eder
la source
Sensationnel. C'est exactement ce que je recherche. Notre système traite normalement les flux de données en séquence, ce serait donc un bon choix pour passer à Java 8.
Andy Dang
16
Notez que cette solution stocke inutilement tout le flux d'entrée vers l'intermédiaire Map(contrairement, par exemple, à la solution Ben Manes)
Tagir Valeev
124

Pour être complet, voici une solution Guava .

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

Dans la question, la collection est disponible, donc un flux n'est pas nécessaire et il peut être écrit comme,

Iterables.partition(data, batchSize).forEach(this::process);
Ben Manes
la source
11
Lists.partitionest une autre variante que j'aurais dû mentionner.
Ben Manes
2
c'est paresseux, non? il n'appellera pas le tout Streamen mémoire avant de traiter le lot concerné
orirab
1
@orirab oui. Il est paresseux entre les lots, car il consommera des batchSizeéléments par itération.
Ben Manes
Pourriez-vous s'il vous plaît jeter un oeil à stackoverflow.com/questions/58666190/…
gstackoverflow
58

L'implémentation Pure Java-8 est également possible:

int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
         .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
         .forEach(batch -> process(batch));

Notez que contrairement à JOOl, il peut bien fonctionner en parallèle (à condition que vous soyez dataune liste d'accès aléatoire).

Tagir Valeev
la source
1
et si vos données sont en fait un flux? (disons des lignes dans un fichier, ou même du réseau).
Omry Yadan
6
@OmryYadan, la question portait sur la contribution de List(voir data.size(), data.get()dans la question). Je réponds à la question posée. Si vous avez une autre question, posez-la à la place (même si je pense que la question du flux a également déjà été posée).
Tagir Valeev
1
Comment traiter les lots en parallèle?
soup_boy
37

Solution Pure Java 8 :

Nous pouvons créer un collecteur personnalisé pour le faire avec élégance, qui prend en a batch sizeet a Consumerpour traiter chaque lot:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
 * Collects elements in the stream and calls the supplied batch processor
 * after the configured batch size is reached.
 *
 * In case of a parallel stream, the batch processor may be called with
 * elements less than the batch size.
 *
 * The elements are not kept in memory, and the final result will be an
 * empty list.
 *
 * @param <T> Type of the elements being collected
 */
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {

    private final int batchSize;
    private final Consumer<List<T>> batchProcessor;


    /**
     * Constructs the batch collector
     *
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     */
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        batchProcessor = requireNonNull(batchProcessor);

        this.batchSize = batchSize;
        this.batchProcessor = batchProcessor;
    }

    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    public BiConsumer<List<T>, T> accumulator() {
        return (ts, t) -> {
            ts.add(t);
            if (ts.size() >= batchSize) {
                batchProcessor.accept(ts);
                ts.clear();
            }
        };
    }

    public BinaryOperator<List<T>> combiner() {
        return (ts, ots) -> {
            // process each parallel list without checking for batch size
            // avoids adding all elements of one to another
            // can be modified if a strict batching mode is required
            batchProcessor.accept(ts);
            batchProcessor.accept(ots);
            return Collections.emptyList();
        };
    }

    public Function<List<T>, List<T>> finisher() {
        return ts -> {
            batchProcessor.accept(ts);
            return Collections.emptyList();
        };
    }

    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}

Si vous le souhaitez, créez une classe d'utilitaire d'assistance:

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils {

    /**
     * Creates a new batch collector
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     * @param <T> the type of elements being processed
     * @return a batch collector instance
     */
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        return new BatchCollector<T>(batchSize, batchProcessor);
    }
}

Exemple d'utilisation:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);

input.stream()
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

J'ai également publié mon code sur GitHub, si quelqu'un veut y jeter un œil:

Lien vers Github

rohitvats
la source
1
C'est une bonne solution, sauf si vous ne pouvez pas insérer tous les éléments de votre flux dans la mémoire. De plus, cela ne fonctionnera pas sur des flux sans fin - la méthode de collecte est terminale, ce qui signifie qu'au lieu de produire un flux de lots, il attendra la fin du flux, puis traitera le résultat par lots.
Alex Ackerman
2
@AlexAckerman un flux infini signifiera que le finisseur ne sera jamais appelé, mais l'accumulateur sera toujours appelé afin que les éléments soient toujours traités. En outre, il ne nécessite que la taille du lot d'éléments à être en mémoire à tout moment.
Solubris
@Solubris, vous avez raison! Mon mauvais, merci de l'avoir signalé - je ne supprimerai pas le commentaire pour la référence, si quelqu'un a la même idée du fonctionnement de la méthode de collecte.
Alex Ackerman
La liste envoyée au consommateur doit être copiée pour sécuriser sa modification, par exemple: batchProcessor.accept (copyOf (ts))
Solubris
19

J'ai écrit un Spliterator personnalisé pour des scénarios comme celui-ci. Il remplira les listes d'une taille donnée à partir du flux d'entrée. L'avantage de cette approche est qu'elle effectuera un traitement paresseux et qu'elle fonctionnera avec d'autres fonctions de flux.

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
    return batchSize <= 0
        ? Stream.of(stream.collect(Collectors.toList()))
        : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}

private static class BatchSpliterator<E> implements Spliterator<List<E>> {

    private final Spliterator<E> base;
    private final int batchSize;

    public BatchSpliterator(Spliterator<E> base, int batchSize) {
        this.base = base;
        this.batchSize = batchSize;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final List<E> batch = new ArrayList<>(batchSize);
        for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
            ;
        if (batch.isEmpty())
            return false;
        action.accept(batch);
        return true;
    }

    @Override
    public Spliterator<List<E>> trySplit() {
        if (base.estimateSize() <= batchSize)
            return null;
        final Spliterator<E> splitBase = this.base.trySplit();
        return splitBase == null ? null
                : new BatchSpliterator<>(splitBase, batchSize);
    }

    @Override
    public long estimateSize() {
        final double baseSize = base.estimateSize();
        return baseSize == 0 ? 0
                : (long) Math.ceil(baseSize / (double) batchSize);
    }

    @Override
    public int characteristics() {
        return base.characteristics();
    }

}
Bruce Hamilton
la source
très utile. Si quelqu'un veut grouper sur certains critères personnalisés (par exemple la taille de la collection en octets), vous pouvez alors déléguer votre prédicat personnalisé et l'utiliser dans la boucle for comme condition (à mon avis, alors que la boucle sera plus lisible alors)
pls
Je ne suis pas sûr que la mise en œuvre soit correcte. Par exemple, si le flux de base est SUBSIZEDle fractionnement renvoyé par trySplitpeut avoir plus d'éléments qu'avant le fractionnement (si le fractionnement se produit au milieu du lot).
Malt
@Malt si ma compréhension de Spliteratorsest correcte, alors trySplitdoit toujours partitionner les données en deux parties à peu près égales afin que le résultat ne soit jamais plus grand que l'original?
Bruce Hamilton le
@BruceHamilton Malheureusement, selon la documentation, les parties ne peuvent pas être à peu près égales. Ils doivent être égaux:if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
Malt le
Oui, cela est conforme à ma compréhension du fractionnement Spliterator. Cependant, j'ai du mal à comprendre comment "les divisions renvoyées par trySplit peuvent avoir plus d'éléments qu'avant la division", pourriez-vous expliquer ce que vous voulez dire ici?
Bruce Hamilton le
13

Nous avions un problème similaire à résoudre. Nous voulions prendre un flux qui était plus grand que la mémoire système (itérer à travers tous les objets d'une base de données) et randomiser l'ordre le mieux possible - nous avons pensé qu'il serait correct de mettre en mémoire tampon 10000 éléments et de les randomiser.

La cible était une fonction qui prenait un flux.

Parmi les solutions proposées ici, il semble y avoir une gamme d'options:

  • Utilisez diverses bibliothèques supplémentaires non-java 8
  • Commencez par quelque chose qui n'est pas un flux - par exemple une liste d'accès aléatoire
  • Avoir un flux qui peut être divisé facilement dans un séparateur

Notre instinct était à l'origine d'utiliser un collecteur personnalisé, mais cela signifiait abandonner le streaming. La solution de collecteur personnalisé ci-dessus est très bonne et nous l'avons presque utilisée.

Voici une solution qui triche en utilisant le fait que Streams peut vous donner un Iteratorque vous pouvez utiliser comme trappe d'échappement pour vous permettre de faire quelque chose de plus que les flux ne prennent pas en charge. Le Iteratorest reconverti en un flux en utilisant un autre peu de StreamSupportsorcellerie Java 8 .

/**
 * An iterator which returns batches of items taken from another iterator
 */
public class BatchingIterator<T> implements Iterator<List<T>> {
    /**
     * Given a stream, convert it to a stream of batches no greater than the
     * batchSize.
     * @param originalStream to convert
     * @param batchSize maximum size of a batch
     * @param <T> type of items in the stream
     * @return a stream of batches taken sequentially from the original stream
     */
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
        return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
    }

    private static <T> Stream<T> asStream(Iterator<T> iterator) {
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(iterator,ORDERED),
            false);
    }

    private int batchSize;
    private List<T> currentBatch;
    private Iterator<T> sourceIterator;

    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.sourceIterator = sourceIterator;
    }

    @Override
    public boolean hasNext() {
        prepareNextBatch();
        return currentBatch!=null && !currentBatch.isEmpty();
    }

    @Override
    public List<T> next() {
        return currentBatch;
    }

    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(sourceIterator.next());
        }
    }
}

Un exemple simple d'utilisation de ceci ressemblerait à ceci:

@Test
public void getsBatches() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        .forEach(System.out::println);
}

Les impressions ci-dessus

[A, B, C]
[D, E, F]

Pour notre cas d'utilisation, nous voulions mélanger les lots, puis les conserver sous forme de flux - cela ressemblait à ceci:

@Test
public void howScramblingCouldBeDone() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
        .map(list -> {
            Collections.shuffle(list); return list; })
        .flatMap(List::stream)
        .forEach(System.out::println);
}

Cela produit quelque chose comme (c'est aléatoire, si différent à chaque fois)

A
C
B
E
D
F

La sauce secrète ici est qu'il y a toujours un flux, vous pouvez donc soit opérer sur un flux de lots, soit faire quelque chose pour chaque lot, puis flatMaple retourner à un flux. Mieux encore, tous les passe au- dessus que la finale forEachou collectou d' autres expressions de terminaison PULL les données à travers le flux.

Il s'avère qu'il iterators'agit d'un type spécial d' opération de terminaison sur un flux et ne provoque pas l'exécution et la mise en mémoire de l'ensemble du flux! Merci aux gars de Java 8 pour un design brillant!

Ashley Frise
la source
Et il est très bon que vous itériez complètement sur chaque lot lorsqu'il est collecté et que Listvous persistiez à un - vous ne pouvez pas différer l'itération des éléments intra-lot car le consommateur voudra peut-être ignorer un lot entier, et si vous n'avez pas consommé le éléments alors ils ne sauteraient pas très loin. (J'ai implémenté l'un d'entre eux en C #, même si c'était beaucoup plus facile.)
ErikE
9

Vous pouvez également utiliser RxJava :

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));

ou

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();

ou

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
fracasser
la source
8

Vous pouvez également jeter un œil à cyclops-react , je suis l'auteur de cette bibliothèque. Il implémente l'interface jOOλ (et par extension JDK 8 Streams), mais contrairement aux JDK 8 Parallel Streams, il se concentre sur les opérations asynchrones (telles que le blocage potentiel des appels d'E / S Async). JDK Parallel Streams, en revanche, se concentre sur le parallélisme des données pour les opérations liées au processeur. Il fonctionne en gérant des agrégats de tâches futures sous le capot, mais présente une API Stream étendue standard aux utilisateurs finaux.

Cet exemple de code peut vous aider à démarrer

LazyFutureStream.parallelCommonBuilder()
                .react(data)
                .grouped(BATCH_SIZE)                  
                .map(this::process)
                .run();

Il y a un tutoriel sur le batching ici

Et un tutoriel plus général ici

Pour utiliser votre propre Thread Pool (qui est probablement plus approprié pour bloquer les E / S), vous pouvez commencer le traitement avec

     LazyReact reactor = new LazyReact(40);

     reactor.react(data)
            .grouped(BATCH_SIZE)                  
            .map(this::process)
            .run();
John McClean
la source
3

Exemple pur Java 8 qui fonctionne également avec des flux parallèles.

Comment utiliser:

Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));

La déclaration et l'implémentation de la méthode:

public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
    List<ElementType> newBatch = new ArrayList<>(batchSize);

    stream.forEach(element -> {
        List<ElementType> fullBatch;

        synchronized (newBatch)
        {
            if (newBatch.size() < batchSize)
            {
                newBatch.add(element);
                return;
            }
            else
            {
                fullBatch = new ArrayList<>(newBatch);
                newBatch.clear();
                newBatch.add(element);
            }
        }

        batchProcessor.accept(fullBatch);
    });

    if (newBatch.size() > 0)
        batchProcessor.accept(new ArrayList<>(newBatch));
}
Nicolas Lacombe
la source
2

En toute honnêteté, jetez un œil à l'élégante solution Vavr :

Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);
Nolequen
la source
1

Exemple simple utilisant Spliterator

    // read file into stream, try-with-resources
    try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
        //skip header
        Spliterator<String> split = stream.skip(1).spliterator();
        Chunker<String> chunker = new Chunker<String>();
        while(true) {              
            boolean more = split.tryAdvance(chunker::doSomething);
            if (!more) {
                break;
            }
        }           
    } catch (IOException e) {
        e.printStackTrace();
    }
}

static class Chunker<T> {
    int ct = 0;
    public void doSomething(T line) {
        System.out.println(ct++ + " " + line.toString());
        if (ct % 100 == 0) {
            System.out.println("====================chunk=====================");               
        }           
    }       
}

La réponse de Bruce est plus complète, mais je cherchais quelque chose de rapide et sale pour traiter un tas de fichiers.

rhinmass
la source
1

c'est une solution java pure qui est évaluée paresseusement.

public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
    List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable 
    currentBatch.add(new ArrayList<T>(batchSize));
    return Stream.concat(stream
      .sequential()                   
      .map(new Function<T, List<T>>(){
          public List<T> apply(T t){
              currentBatch.get(0).add(t);
              return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
            }
      }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
                .limit(1)
    ).filter(Objects::nonNull);
}
Il je
la source
1

Vous pouvez utiliser apache.commons:

ListUtils.partition(ListOfLines, 500).stream()
                .map(partition -> processBatch(partition)
                .collect(Collectors.toList());

La partie partitionnement se fait sans paresseux, mais une fois la liste partitionnée, vous bénéficiez des avantages de travailler avec des flux (par exemple, utilisez des flux parallèles, ajoutez des filtres, etc.). D'autres réponses suggèrent des solutions plus élaborées mais parfois la lisibilité et la maintenabilité sont plus importantes (et parfois elles ne le sont pas :-))

Tal Joffe
la source
Je ne sais pas qui a voté contre mais ce serait bien de comprendre pourquoi. J'ai donné une réponse qui a complété les autres réponses pour les personnes ne pouvant pas utiliser Guava
Tal Joffe
Vous traitez une liste ici, pas un flux.
Drakemor
@Drakemor Je traite un flux de sous-listes. remarquez l'appel de la fonction stream ()
Tal Joffe
Mais d'abord, vous le transformez en une liste de sous-listes, qui ne fonctionneront pas correctement pour les vraies données diffusées en continu. Voici la référence à la partition: commons.apache.org/proper/commons-collections/apidocs/org/...
Drakemor
1
TBH Je ne comprends pas entièrement votre argument, mais je suppose que nous pouvons être d'accord pour ne pas être d'accord. J'ai modifié ma réponse pour refléter notre conversation ici. Merci pour la discussion
Tal Joffe
1

Cela pourrait être facilement fait en utilisant Reactor :

Flux.fromStream(fileReader.lines().onClose(() -> safeClose(fileReader)))
            .map(line -> someProcessingOfSingleLine(line))
            .buffer(BUFFER_SIZE)
            .subscribe(apiService::makeHttpRequest);
Alex
la source
0

Avec Java 8et com.google.common.collect.Lists, vous pouvez faire quelque chose comme:

public class BatchProcessingUtil {
    public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) {
        List<List<T>> batches = Lists.partition(data, batchSize);
        return batches.stream()
                .map(processFunction) // Send each batch to the process function
                .flatMap(Collection::stream) // flat results to gather them in 1 stream
                .collect(Collectors.toList());
    }
}

Voici Tle type des éléments dans la liste d'entrée et Ule type des éléments dans la liste de sortie

Et vous pouvez l'utiliser comme ceci:

List<String> userKeys = [... list of user keys]
List<Users> users = BatchProcessingUtil.process(
    userKeys,
    10, // Batch Size
    partialKeys -> service.getUsers(partialKeys)
);
Josebui
la source