Je veux utiliser un Stream
pour paralléliser le traitement d'un ensemble hétérogène de fichiers JSON stockés à distance de nombre inconnu (le nombre de fichiers n'est pas connu à l'avance). La taille des fichiers peut varier considérablement, de 1 enregistrement JSON par fichier jusqu'à 100 000 enregistrements dans certains autres fichiers. Un enregistrement JSON dans ce cas signifie un objet JSON autonome représenté comme une ligne dans le fichier.
Je veux vraiment utiliser Streams pour cela et j'ai donc implémenté ceci Spliterator
:
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
Le problème que j'ai, c'est que même si le Stream se parallélise magnifiquement au début, le fichier le plus volumineux est finalement traité en un seul thread. Je crois que la cause proximale est bien documentée: le séparateur est "déséquilibré".
Plus concrètement, il semble que la trySplit
méthode ne soit pas appelée après un certain point du Stream.forEach
cycle de vie de l ', de sorte que la logique supplémentaire de distribution de petits lots à la fin de trySplit
est rarement exécutée.
Remarquez comment tous les séparateurs renvoyés par trySplit partagent le même paths
itérateur. Je pensais que c'était un moyen très intelligent d'équilibrer le travail entre tous les séparateurs, mais cela n'a pas été suffisant pour atteindre un parallélisme complet.
Je voudrais que le traitement parallèle se poursuive d'abord sur les fichiers, puis lorsque quelques fichiers volumineux sont encore divisés, je veux paralléliser sur des morceaux des fichiers restants. C'était l'intention du else
bloc à la fin de trySplit
.
Existe-t-il un moyen facile / simple / canonique de contourner ce problème?
la source
Long.MAX_VALUE
provoque un fractionnement excessif et inutile, tandis que toute estimation autre que celle-ciLong.MAX_VALUE
provoque l'arrêt du fractionnement, tuant le parallélisme. Le retour d'un mélange d'estimations précises ne semble pas conduire à des optimisations intelligentes.AbstractSpliterator
mais remplaceztrySplit()
ce qui est un mauvais combo pour autre chose queLong.MAX_VALUE
, car vous n'adaptez pas l'estimation de taille danstrySplit()
. EnsuitetrySplit()
, l'estimation de la taille doit être réduite du nombre d'éléments qui ont été séparés.Réponses:
Votre
trySplit
devrait produire des divisions de taille égale, quelle que soit la taille des fichiers sous-jacents. Vous devez traiter tous les fichiers comme une seule unité et remplir àArrayList
chaque fois le séparateur avec le même nombre d'objets JSON. Le nombre d'objets doit être tel que le traitement d'un fractionnement prenne entre 1 et 10 millisecondes: inférieur à 1 ms et vous commencez à approcher les coûts de transfert du lot vers un thread de travail, plus élevés que cela et vous commencez à risquer une charge CPU inégale due à des tâches trop grossières.Le séparateur n'est pas obligé de rapporter une estimation de taille, et vous le faites déjà correctement: votre estimation est
Long.MAX_VALUE
, ce qui est une valeur spéciale signifiant "illimitée". Cependant, si vous avez de nombreux fichiers avec un seul objet JSON, ce qui entraîne des lots de taille 1, cela nuira à vos performances de deux manières: la surcharge d'ouverture-lecture-fermeture du fichier peut devenir un goulot d'étranglement et, si vous parvenez à vous échapper cela, le coût du transfert de threads peut être significatif par rapport au coût de traitement d'un article, provoquant à nouveau un goulot d'étranglement.Il y a cinq ans, je résolvais un problème similaire, vous pouvez jeter un œil à ma solution .
la source
Long.MAX_VALUE
décrivez correctement une taille inconnue, mais cela n'aide pas lorsque l'implémentation de Stream réelle fonctionne mal alors. Même en utilisant le résultat de laThreadLocalRandom.current().nextInt(100, 100_000)
taille estimée, on obtient de meilleurs résultats.ArraySpliterator
qui a une taille estimée (même une taille exacte). Ainsi, l'implémentation de Stream verra la taille du tableau par rapport àLong.MAX_VALUE
, considérez ceci comme déséquilibré et divisez le séparateur "plus grand" (en ignorant celaLong.MAX_VALUE
signifie "inconnu"), jusqu'à ce qu'il ne puisse pas se diviser davantage. Ensuite, s'il n'y a pas assez de morceaux, il divisera les séparateurs basés sur la matrice en utilisant leurs tailles connues. Oui, cela fonctionne très bien, mais ne contredit pas mon affirmation selon laquelle vous avez besoin d'une estimation de la taille, quelle que soit sa pauvreté.Long.MAX_VALUE
ferait.Après beaucoup d'expérimentation, je n'ai toujours pas pu obtenir de parallélisme supplémentaire en jouant avec les estimations de taille. Fondamentalement, toute valeur autre que celle-ci
Long.MAX_VALUE
aura tendance à entraîner le séparateur à se terminer trop tôt (et sans aucun fractionnement), tandis que d'un autre côté, uneLong.MAX_VALUE
estimation seratrySplit
appelée sans relâche jusqu'à ce qu'elle reviennenull
.La solution que j'ai trouvée est de partager en interne les ressources entre les séparateurs et de les laisser se rééquilibrer entre eux.
Code de travail:
la source