Pouvez-vous rééquilibrer un séparateur déséquilibré de taille inconnue?

12

Je veux utiliser un Streampour paralléliser le traitement d'un ensemble hétérogène de fichiers JSON stockés à distance de nombre inconnu (le nombre de fichiers n'est pas connu à l'avance). La taille des fichiers peut varier considérablement, de 1 enregistrement JSON par fichier jusqu'à 100 000 enregistrements dans certains autres fichiers. Un enregistrement JSON dans ce cas signifie un objet JSON autonome représenté comme une ligne dans le fichier.

Je veux vraiment utiliser Streams pour cela et j'ai donc implémenté ceci Spliterator:

public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {

    abstract protected JsonStreamSupport<METADATA> openInputStream(String path);

    abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);

    private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
    private static final int MAX_BUFFER = 100;
    private final Iterator<String> paths;
    private JsonStreamSupport<METADATA> reader = null;

    public JsonStreamSpliterator(Iterator<String> paths) {
        this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
        super(est, additionalCharacteristics);
        this.paths = paths;
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
        this(est, additionalCharacteristics, paths);
        open(nextPath);
    }

    @Override
    public boolean tryAdvance(Consumer<? super RECORD> action) {
        if(reader == null) {
            String path = takeNextPath();
            if(path != null) {
                open(path);
            }
            else {
                return false;
            }
        }
        Map<String, Object> json = reader.readJsonLine();
        if(json != null) {
            RECORD item = parse(reader.getMetadata(), json);
            action.accept(item);
            return true;
        }
        else {
            reader.close();
            reader = null;
            return tryAdvance(action);
        }
    }

    private void open(String path) {
        reader = openInputStream(path);
    }

    private String takeNextPath() {
        synchronized(paths) {
            if(paths.hasNext()) {
                return paths.next();
            }
        }
        return null;
    }

    @Override
    public Spliterator<RECORD> trySplit() {
        String nextPath = takeNextPath();
        if(nextPath != null) {
            return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
                @Override
                protected JsonStreamSupport<METADATA> openInputStream(String path) {
                    return JsonStreamSpliterator.this.openInputStream(path);
                }
                @Override
                protected RECORD parse(METADATA metaData, Map<String,Object> json) {
                    return JsonStreamSpliterator.this.parse(metaData, json);
                }
            };              
        }
        else {
            List<RECORD> records = new ArrayList<RECORD>();
            while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
                // loop
            }
            if(records.size() != 0) {
                return records.spliterator();
            }
            else {
                return null;
            }
        }
    }
}

Le problème que j'ai, c'est que même si le Stream se parallélise magnifiquement au début, le fichier le plus volumineux est finalement traité en un seul thread. Je crois que la cause proximale est bien documentée: le séparateur est "déséquilibré".

Plus concrètement, il semble que la trySplitméthode ne soit pas appelée après un certain point du Stream.forEachcycle de vie de l ', de sorte que la logique supplémentaire de distribution de petits lots à la fin de trySplitest rarement exécutée.

Remarquez comment tous les séparateurs renvoyés par trySplit partagent le même pathsitérateur. Je pensais que c'était un moyen très intelligent d'équilibrer le travail entre tous les séparateurs, mais cela n'a pas été suffisant pour atteindre un parallélisme complet.

Je voudrais que le traitement parallèle se poursuive d'abord sur les fichiers, puis lorsque quelques fichiers volumineux sont encore divisés, je veux paralléliser sur des morceaux des fichiers restants. C'était l'intention du elsebloc à la fin de trySplit.

Existe-t-il un moyen facile / simple / canonique de contourner ce problème?

Alex R
la source
2
Vous avez besoin d'une estimation de taille. Il peut être totalement faux, tant qu'il reflète à peu près le rapport de votre répartition déséquilibrée. Sinon, le flux ne sait pas que les séparations sont déséquilibrées et s'arrêteront dès qu'un certain nombre de morceaux aura été créé.
Holger
@Holger pouvez-vous développer "s'arrêtera une fois qu'un certain nombre de morceaux aura été créé" ou me diriger vers la source JDK pour cela? Quel est le nombre de morceaux où ça s'arrête?
Alex R
Le code n'est pas pertinent, car il afficherait trop de détails d'implémentation non pertinents, qui pourraient changer à tout moment. Le point pertinent est que l'implémentation essaie d'appeler le fractionnement assez souvent, de sorte que chaque thread de travail (ajusté en fonction du nombre de cœurs CPU) a quelque chose à faire. Pour compenser les différences imprévisibles dans le temps de calcul, il produira probablement encore plus de morceaux que les threads de travail pour permettre le vol de travail et utiliser les tailles estimées comme heuristiques (par exemple pour décider quel sous-séparateur diviser davantage). Voir aussi stackoverflow.com/a/48174508/2711488
Holger
J'ai fait quelques expériences pour essayer de comprendre votre commentaire. L'heuristique semble être assez primitive. Il semble que le retour Long.MAX_VALUEprovoque un fractionnement excessif et inutile, tandis que toute estimation autre que celle-ci Long.MAX_VALUEprovoque l'arrêt du fractionnement, tuant le parallélisme. Le retour d'un mélange d'estimations précises ne semble pas conduire à des optimisations intelligentes.
Alex R
Je ne prétends pas que la stratégie de mise en œuvre était très intelligente, mais au moins, cela fonctionne pour certains scénarios avec des tailles estimées (sinon, il y avait beaucoup plus de rapports de bogues à ce sujet). Il semble donc qu'il y ait eu des erreurs de votre côté lors des expériences. Par exemple, dans le code de votre question, vous étendez AbstractSpliteratormais remplacez trySplit()ce qui est un mauvais combo pour autre chose que Long.MAX_VALUE, car vous n'adaptez pas l'estimation de taille dans trySplit(). Ensuite trySplit(), l'estimation de la taille doit être réduite du nombre d'éléments qui ont été séparés.
Holger

Réponses:

0

Votre trySplitdevrait produire des divisions de taille égale, quelle que soit la taille des fichiers sous-jacents. Vous devez traiter tous les fichiers comme une seule unité et remplir à ArrayListchaque fois le séparateur avec le même nombre d'objets JSON. Le nombre d'objets doit être tel que le traitement d'un fractionnement prenne entre 1 et 10 millisecondes: inférieur à 1 ms et vous commencez à approcher les coûts de transfert du lot vers un thread de travail, plus élevés que cela et vous commencez à risquer une charge CPU inégale due à des tâches trop grossières.

Le séparateur n'est pas obligé de rapporter une estimation de taille, et vous le faites déjà correctement: votre estimation est Long.MAX_VALUE, ce qui est une valeur spéciale signifiant "illimitée". Cependant, si vous avez de nombreux fichiers avec un seul objet JSON, ce qui entraîne des lots de taille 1, cela nuira à vos performances de deux manières: la surcharge d'ouverture-lecture-fermeture du fichier peut devenir un goulot d'étranglement et, si vous parvenez à vous échapper cela, le coût du transfert de threads peut être significatif par rapport au coût de traitement d'un article, provoquant à nouveau un goulot d'étranglement.

Il y a cinq ans, je résolvais un problème similaire, vous pouvez jeter un œil à ma solution .

Marko Topolnik
la source
Oui, vous n'êtes "pas obligé de déclarer une estimation de taille" et Long.MAX_VALUE décrivez correctement une taille inconnue, mais cela n'aide pas lorsque l'implémentation de Stream réelle fonctionne mal alors. Même en utilisant le résultat de la ThreadLocalRandom.current().nextInt(100, 100_000)taille estimée, on obtient de meilleurs résultats.
Holger
Il a bien fonctionné pour mes cas d'utilisation, où le coût de calcul de chaque article était substantiel. J'atteignais facilement 98% de l'utilisation totale du processeur et le débit était mis à l'échelle presque linéairement avec le parallélisme. Fondamentalement, il est important d'obtenir la bonne taille de lot pour que le traitement prenne entre 1 et 10 millisecondes. C'est bien au-dessus des coûts de transfert de threads et pas trop long pour provoquer des problèmes de granularité des tâches. J'ai publié des résultats de référence vers la fin de ce post .
Marko Topolnik
Votre solution se sépare d'un ArraySpliteratorqui a une taille estimée (même une taille exacte). Ainsi, l'implémentation de Stream verra la taille du tableau par rapport à Long.MAX_VALUE, considérez ceci comme déséquilibré et divisez le séparateur "plus grand" (en ignorant cela Long.MAX_VALUEsignifie "inconnu"), jusqu'à ce qu'il ne puisse pas se diviser davantage. Ensuite, s'il n'y a pas assez de morceaux, il divisera les séparateurs basés sur la matrice en utilisant leurs tailles connues. Oui, cela fonctionne très bien, mais ne contredit pas mon affirmation selon laquelle vous avez besoin d'une estimation de la taille, quelle que soit sa pauvreté.
Holger
OK, donc il semble y avoir un malentendu --- parce que vous n'avez pas besoin d'une estimation de taille sur l'entrée. Juste sur les divisions individuelles, et vous pouvez toujours l'avoir.
Marko Topolnik
Eh bien, mon premier commentaire était " Vous avez besoin d'une estimation de la taille. Elle peut être totalement fausse, tant qu'elle reflète à peu près le rapport de votre répartition déséquilibrée. " Le point clé ici était que le code de l'OP crée un autre séparateur contenant un seul élément mais signalant toujours une taille inconnue. C'est ce qui rend l'implémentation de Stream impuissante. Tout nombre estimé pour le nouveau séparateur étant significativement plus petit, le Long.MAX_VALUEferait.
Holger
0

Après beaucoup d'expérimentation, je n'ai toujours pas pu obtenir de parallélisme supplémentaire en jouant avec les estimations de taille. Fondamentalement, toute valeur autre que celle-ci Long.MAX_VALUEaura tendance à entraîner le séparateur à se terminer trop tôt (et sans aucun fractionnement), tandis que d'un autre côté, une Long.MAX_VALUEestimation sera trySplitappelée sans relâche jusqu'à ce qu'elle revienne null.

La solution que j'ai trouvée est de partager en interne les ressources entre les séparateurs et de les laisser se rééquilibrer entre eux.

Code de travail:

public class AwsS3LineSpliterator<LINE> extends AbstractSpliterator<AwsS3LineInput<LINE>> {

    public final static class AwsS3LineInput<LINE> {
        final public S3ObjectSummary s3ObjectSummary;
        final public LINE lineItem;
        public AwsS3LineInput(S3ObjectSummary s3ObjectSummary, LINE lineItem) {
            this.s3ObjectSummary = s3ObjectSummary;
            this.lineItem = lineItem;
        }
    }

    private final class InputStreamHandler {
        final S3ObjectSummary file;
        final InputStream inputStream;
        InputStreamHandler(S3ObjectSummary file, InputStream is) {
            this.file = file;
            this.inputStream = is;
        }
    }

    private final Iterator<S3ObjectSummary> incomingFiles;

    private final Function<S3ObjectSummary, InputStream> fileOpener;

    private final Function<InputStream, LINE> lineReader;

    private final Deque<S3ObjectSummary> unopenedFiles;

    private final Deque<InputStreamHandler> openedFiles;

    private final Deque<AwsS3LineInput<LINE>> sharedBuffer;

    private final int maxBuffer;

    private AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener,
            Function<InputStream, LINE> lineReader,
            Deque<S3ObjectSummary> unopenedFiles, Deque<InputStreamHandler> openedFiles, Deque<AwsS3LineInput<LINE>> sharedBuffer,
            int maxBuffer) {
        super(Long.MAX_VALUE, 0);
        this.incomingFiles = incomingFiles;
        this.fileOpener = fileOpener;
        this.lineReader = lineReader;
        this.unopenedFiles = unopenedFiles;
        this.openedFiles = openedFiles;
        this.sharedBuffer = sharedBuffer;
        this.maxBuffer = maxBuffer;
    }

    public AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener, Function<InputStream, LINE> lineReader, int maxBuffer) {
        this(incomingFiles, fileOpener, lineReader, new ConcurrentLinkedDeque<>(), new ConcurrentLinkedDeque<>(), new ArrayDeque<>(maxBuffer), maxBuffer);
    }

    @Override
    public boolean tryAdvance(Consumer<? super AwsS3LineInput<LINE>> action) {
        AwsS3LineInput<LINE> lineInput;
        synchronized(sharedBuffer) {
            lineInput=sharedBuffer.poll();
        }
        if(lineInput != null) {
            action.accept(lineInput);
            return true;
        }
        InputStreamHandler handle = openedFiles.poll();
        if(handle == null) {
            S3ObjectSummary unopenedFile = unopenedFiles.poll();
            if(unopenedFile == null) {
                return false;
            }
            handle = new InputStreamHandler(unopenedFile, fileOpener.apply(unopenedFile));
        }
        for(int i=0; i < maxBuffer; ++i) {
            LINE line = lineReader.apply(handle.inputStream);
            if(line != null) {
                synchronized(sharedBuffer) {
                    sharedBuffer.add(new AwsS3LineInput<LINE>(handle.file, line));
                }
            }
            else {
                return tryAdvance(action);
            }
        }
        openedFiles.addFirst(handle);
        return tryAdvance(action);
    }

    @Override
    public Spliterator<AwsS3LineInput<LINE>> trySplit() {
        synchronized(incomingFiles) {
            if (incomingFiles.hasNext()) {
                unopenedFiles.add(incomingFiles.next());
                return new AwsS3LineSpliterator<LINE>(incomingFiles, fileOpener, lineReader, unopenedFiles, openedFiles, sharedBuffer, maxBuffer);
            } else {
                return null;
            }
        }
    }
}
Alex R
la source