Flux parallèle Java - ordre d'invocation de la méthode parallel () [fermé]

11
AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
     .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
     .parallel()           
     .filter(record -> doSomeOperation())
     .findFirst()

Quand j'ai écrit ceci, j'ai supposé que les threads ne seraient engendrés que par l'appel de carte car le parallèle est placé après la carte. Mais certaines lignes du fichier obtenaient des numéros d'enregistrement différents pour chaque exécution.

J'ai lu la documentation officielle des flux Java et quelques sites Web pour comprendre comment les flux fonctionnent sous le capot.

Quelques questions:

  • Le flux parallèle Java fonctionne sur la base de SplitIterator , qui est implémenté par chaque collection comme ArrayList, LinkedList etc. Lorsque nous construisons un flux parallèle à partir de ces collections, l'itérateur de fractionnement correspondant sera utilisé pour fractionner et itérer la collection. Cela explique pourquoi le parallélisme s'est produit au niveau de la source d'entrée d'origine (lignes de fichier) plutôt qu'au niveau du résultat de la carte (c'est-à-dire Record pojo). Ma compréhension est-elle correcte?

  • Dans mon cas, l'entrée est un flux d'E / S de fichiers. Quel itérateur divisé sera utilisé?

  • Peu importe où nous nous situons parallel()dans le pipeline. La source d'entrée d'origine sera toujours divisée et les opérations intermédiaires restantes seront appliquées.

    Dans ce cas, Java ne devrait pas permettre aux utilisateurs de placer des opérations parallèles n'importe où dans le pipeline, sauf à la source d'origine. Parce que cela donne une mauvaise compréhension à ceux qui ne savent pas comment Java Stream fonctionne en interne. Je sais que l' parallel()opération aurait été définie pour le type d'objet Stream et donc, cela fonctionne de cette façon. Mais, il est préférable de fournir une autre solution.

  • Dans l'extrait de code ci-dessus, j'essaie d'ajouter un numéro de ligne à chaque enregistrement du fichier d'entrée et il doit donc être commandé. Cependant, je veux appliquer doSomeOperation()en parallèle car c'est une logique lourde. La seule façon d'y parvenir est d'écrire mon propre itérateur divisé personnalisé. Est-ce qu'il y a un autre moyen?

explorateur
la source
2
Cela a plus à voir avec la façon dont les créateurs Java ont décidé de concevoir l'interface. Vous placez vos demandes dans le pipeline et tout ce qui n'est pas une opération finale sera collecté en premier. parallel()n'est rien de plus qu'une demande de modification générale qui est appliquée à l'objet de flux sous-jacent. N'oubliez pas qu'il n'y a qu'un seul flux source si vous n'appliquez pas d'opérations finales au tube, c'est-à-dire tant que rien n'est "exécuté". Cela dit, vous ne faites que remettre en question les choix de conception Java. Ce qui est basé sur l'opinion et nous ne pouvons pas vraiment aider à cela.
Zabuzard
1
Je comprends parfaitement votre point de vue et votre confusion, mais je ne pense pas qu'il existe de bien meilleures solutions. La méthode est proposée Streamdirectement dans l' interface et en raison de la mise en cascade agréable, chaque opération est redistribuée Stream. Imaginez que quelqu'un veuille vous donner Streammais a déjà appliqué quelques opérations comme mapcelle-ci. En tant qu'utilisateur, vous voulez toujours pouvoir décider de l'exécuter en parallèle ou non. Il doit donc être possible d'appeler parallel()encore, bien que le flux existe déjà.
Zabuzard
1
De plus, je préférerais me demander pourquoi vous voudriez exécuter une partie d'un flux séquentiellement, puis passer plus tard en parallèle. Si le flux est déjà suffisamment grand pour être qualifié pour une exécution parallèle, cela s'applique probablement également à tout ce qui se trouve avant dans le pipeline. Alors pourquoi ne pas utiliser l'exécution parallèle pour cette partie également? J'obtiens qu'il existe des cas marginaux comme si vous augmentez considérablement la taille avec flatMapou si vous exécutez des méthodes thread-unsafe ou similaires.
Zabuzard
1
@Zabuza Je ne remets pas en question le choix du design java mais je fais juste part de mes inquiétudes. Tout utilisateur de flux Java de base pourrait obtenir la même confusion à moins qu'il ne comprenne le fonctionnement du flux. Je suis tout à fait d'accord avec votre deuxième commentaire. Je viens de souligner une solution possible qui pourrait avoir son propre inconvénient, comme vous l'avez mentionné. Mais, nous pouvons voir si cela peut être résolu d'une autre manière. Concernant votre 3ème commentaire, j'ai déjà évoqué mon cas d'utilisation dans le dernier point de ma description
explorateur
1
@Eugene lorsque le Pathest sur le système de fichiers local et que vous utilisez un JDK récent, le séparateur aura une meilleure capacité de traitement parallèle que les multiples de 1024 par lots. Mais le fractionnement équilibré peut même être contre-productif dans certains findFirstscénarios…
Holger

Réponses:

8

Cela explique pourquoi le parallélisme s'est produit au niveau de la source d'entrée d'origine (lignes de fichier) plutôt qu'au niveau du résultat de la carte (c'est-à-dire Record pojo).

Le flux entier est parallèle ou séquentiel. Nous ne sélectionnons pas un sous-ensemble d'opérations à exécuter séquentiellement ou en parallèle.

Lorsque l'opération de terminal est lancée, le pipeline de flux est exécuté séquentiellement ou en parallèle selon l'orientation du flux sur lequel il est appelé. [...] Lorsque l'opération de terminal est lancée, le pipeline de flux est exécuté séquentiellement ou en parallèle selon le mode du flux sur lequel il est appelé. même source

Comme vous le mentionnez, les flux parallèles utilisent des itérateurs fractionnés. De toute évidence, il s'agit de partitionner les données avant le démarrage des opérations.


Dans mon cas, l'entrée est un flux d'E / S de fichiers. Quel itérateur divisé sera utilisé?

En regardant la source, je vois qu'il utilise java.nio.file.FileChannelLinesSpliterator


Peu importe où nous plaçons parallel () dans le pipeline. La source d'entrée d'origine sera toujours divisée et les opérations intermédiaires restantes seront appliquées.

Droite. Vous pouvez même appeler parallel()et à sequential()plusieurs reprises. Celui invoqué en dernier l'emportera. Lorsque nous appelons parallel(), nous définissons cela pour le flux renvoyé; et comme indiqué ci-dessus, toutes les opérations s'exécutent séquentiellement ou en parallèle.


Dans ce cas, Java ne devrait pas permettre aux utilisateurs de placer une opération parallèle n'importe où dans le pipeline, sauf à la source d'origine ...

Cela devient une question d'opinions. Je pense que Zabuza donne une bonne raison de soutenir le choix des concepteurs JDK.


La seule façon d'y parvenir est d'écrire mon propre itérateur divisé personnalisé. Est-ce qu'il y a un autre moyen?

Cela dépend de vos opérations

  • Si findFirst()c'est votre véritable opération de terminal, vous n'avez même pas à vous soucier de l'exécution parallèle, car il n'y aura pas beaucoup d'appels de doSomething()toute façon ( findFirst()c'est un court-circuit). .parallel()en fait, cela peut entraîner le traitement de plusieurs éléments, alors que findFirst()sur un flux séquentiel, cela empêcherait cela.
  • Si votre opération de terminal ne crée pas beaucoup de données, vous pouvez peut-être créer vos Recordobjets à l'aide d'un flux séquentiel, puis traiter le résultat en parallèle:

    List<Record> smallData = Files.lines(inputFile.toPath(), 
                                         StandardCharsets.UTF_8)
      .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
      .collect(Collectors.toList())
      .parallelStream()     
      .filter(record -> doSomeOperation())
      .collect(Collectors.toList());
    
  • Si votre pipeline charge beaucoup de données en mémoire (ce qui peut être la raison de votre utilisation Files.lines()), vous aurez peut-être besoin d'un itérateur de fractionnement personnalisé. Avant d'y aller, cependant, j'examinerais d'autres options (telles que l'enregistrement des lignes avec une colonne id pour commencer - c'est juste mon avis).
    J'essaierais également de traiter des enregistrements en lots plus petits, comme ceci:

    AtomicInteger recordNumber = new AtomicInteger();
    final int batchSize = 10;
    
    try(BufferedReader reader = Files.newBufferedReader(inputFile.toPath(), 
            StandardCharsets.UTF_8);) {
        Supplier<List<Record>> batchSupplier = () -> {
            List<Record> batch = new ArrayList<>();
            for (int i = 0; i < batchSize; i++) {
                String nextLine;
                try {
                    nextLine = reader.readLine();
                } catch (IOException e) {
                    //hanlde exception
                    throw new RuntimeException(e);
                }
    
                if(null == nextLine) 
                    return batch;
                batch.add(new Record(recordNumber.getAndIncrement(), nextLine));
            }
            System.out.println("next batch");
    
            return batch;
        };
    
        Stream.generate(batchSupplier)
            .takeWhile(list -> list.size() >= batchSize)
            .map(list -> list.parallelStream()
                             .filter(record -> doSomeOperation())
                             .collect(Collectors.toList()))
            .flatMap(List::stream)
            .forEach(System.out::println);
    }
    

    Cela s'exécute doSomeOperation()en parallèle sans charger toutes les données en mémoire. Mais notez qu'il batchSizefaudra réfléchir.

ernest_k
la source
1
Merci pour la clarification. Il est bon de connaître la 3e solution que vous avez mise en évidence. Je vais jeter un œil car je n'ai pas utilisé takeWhile et Supplier.
explorateur
2
Une Spliteratorimplémentation personnalisée ne serait pas plus compliquée que cela, tout en permettant un traitement parallèle plus efficace…
Holger
1
Chacune de vos parallelStreamopérations internes a une surcharge fixe pour lancer l'opération et attendre le résultat final, tout en étant limitée à un parallélisme de batchSize. Tout d'abord, vous avez besoin d'un multiple du nombre actuellement disponible de cœurs de processeur pour éviter les threads inactifs. Ensuite, le nombre doit être suffisamment élevé pour compenser la surcharge fixe, mais plus le nombre est élevé, plus la pause imposée par l'opération de lecture séquentielle se produit avant même le début du traitement parallèle.
Holger
1
Tourner le flux externe en parallèle provoquerait de mauvaises interférences avec l'intérieur dans l'implémentation actuelle, en plus du point qui Stream.generateproduit un flux non ordonné, qui ne fonctionne pas avec les cas d'utilisation prévus de l'OP comme findFirst(). En revanche, un seul flux parallèle avec un séparateur qui renvoie les morceaux dans les trySplitœuvres directement et permet aux threads de travail de traiter le prochain morceau sans attendre la fin du précédent.
Holger
2
Il n'y a aucune raison de supposer qu'une findFirst()opération ne traitera qu'un petit nombre d'éléments. La première correspondance peut toujours se produire après le traitement de 90% de tous les éléments. De plus, lorsque vous avez dix millions de lignes, même trouver une correspondance après 10% nécessite toujours de traiter un million de lignes.
Holger
7

La conception originale de Stream incluait l'idée de prendre en charge les étapes de pipeline suivantes avec différents paramètres d'exécution parallèle, mais cette idée a été abandonnée. L'API peut provenir de cette époque, mais d'un autre côté, une conception d'API qui oblige l'appelant à prendre une seule décision non ambiguë pour une exécution parallèle ou séquentielle serait beaucoup plus compliquée.

Le réel Spliteratorutilisé par Files.lines(…)dépend de l'implémentation. En Java 8 (Oracle ou OpenJDK), vous obtenez toujours la même chose qu'avec BufferedReader.lines(). Dans les JDK plus récents, si le Pathappartient au système de fichiers par défaut et que le jeu de caractères est l'un des pris en charge pour cette fonctionnalité, vous obtenez un Stream avec une Spliteratorimplémentation dédiée , le java.nio.file.FileChannelLinesSpliterator. Si les conditions préalables ne sont pas remplies, vous obtenez la même chose qu'avec BufferedReader.lines(), qui est toujours basé sur un Iteratorimplémenté dans BufferedReaderet enveloppé via Spliterators.spliteratorUnknownSize.

Votre tâche spécifique est mieux gérée avec une personnalisation Spliteratorqui peut effectuer la numérotation des lignes directement à la source, avant le traitement parallèle, pour permettre un traitement parallèle ultérieur sans restrictions.

public static Stream<Record> records(Path p) throws IOException {
    LineNoSpliterator sp = new LineNoSpliterator(p);
    return StreamSupport.stream(sp, false).onClose(sp);
}

private static class LineNoSpliterator implements Spliterator<Record>, Runnable {
    int chunkSize = 100;
    SeekableByteChannel channel;
    LineNumberReader reader;

    LineNoSpliterator(Path path) throws IOException {
        channel = Files.newByteChannel(path, StandardOpenOption.READ);
        reader=new LineNumberReader(Channels.newReader(channel,StandardCharsets.UTF_8));
    }

    @Override
    public void run() {
        try(Closeable c1 = reader; Closeable c2 = channel) {}
        catch(IOException ex) { throw new UncheckedIOException(ex); }
        finally { reader = null; channel = null; }
    }

    @Override
    public boolean tryAdvance(Consumer<? super Record> action) {
        try {
            String line = reader.readLine();
            if(line == null) return false;
            action.accept(new Record(reader.getLineNumber(), line));
            return true;
        } catch (IOException ex) {
            throw new UncheckedIOException(ex);
        }
    }

    @Override
    public Spliterator<Record> trySplit() {
        Record[] chunks = new Record[chunkSize];
        int read;
        for(read = 0; read < chunks.length; read++) {
            int pos = read;
            if(!tryAdvance(r -> chunks[pos] = r)) break;
        }
        return Spliterators.spliterator(chunks, 0, read, characteristics());
    }

    @Override
    public long estimateSize() {
        try {
            return (channel.size() - channel.position()) / 60;
        } catch (IOException ex) {
            return 0;
        }
    }

    @Override
    public int characteristics() {
        return ORDERED | NONNULL | DISTINCT;
    }
}
Holger
la source
0

Et ce qui suit est une simple démonstration de l'application du parallèle. La sortie de peek montre clairement la différence entre les deux exemples. Remarque: l' mapappel est simplement lancé pour ajouter une autre méthode avant parallel.

IntStream.rangeClosed (1,20).peek(a->System.out.print(a+" "))
        .map(a->a + 200).sum();
System.out.println();
IntStream.rangeClosed(1,20).peek(a->System.out.print(a+" "))
        .map(a->a + 200).parallel().sum();
WJS
la source