Utilisation de spark 2.4.4 fonctionnant en mode cluster YARN avec le planificateur spark FIFO.
Je soumets plusieurs opérations de trame de données spark (c'est-à-dire l'écriture de données dans S3) à l'aide d'un exécuteur de pool de threads avec un nombre variable de threads. Cela fonctionne très bien si j'ai ~ 10 threads, mais si j'utilise des centaines de threads, il semble y avoir un blocage, aucun travail n'étant planifié selon l'interface utilisateur Spark.
Quels facteurs contrôlent le nombre de travaux pouvant être planifiés simultanément? Ressources du pilote (par exemple mémoire / cœurs)? D'autres paramètres de configuration d'allumage?
ÉDITER:
Voici un bref synopsis de mon code
ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(pool);
Dataset<Row> aHugeDf = spark.read.json(hundredsOfPaths);
List<Future<Void>> futures = listOfSeveralHundredThings
.stream()
.map(aThing -> ecs.submit(() -> {
df
.filter(col("some_column").equalTo(aThing))
.write()
.format("org.apache.hudi")
.options(writeOptions)
.save(outputPathFor(aThing));
return null;
}))
.collect(Collectors.toList());
IntStream.range(0, futures.size()).forEach(i -> ecs.poll(30, TimeUnit.MINUTES));
exec.shutdownNow();
À un moment donné, au fur et à nThreads
mesure que l' augmentation augmente, spark ne semble plus planifier de travaux, comme en témoignent:
ecs.poll(...)
expirer finalement- L'onglet Spark UI jobs ne montrant aucun travail actif
- L'onglet Spark UI executors ne montrant aucune tâche active pour aucun exécuteur
- L'onglet SQL Spark UI affichant
nThreads
les requêtes en cours d'exécution sans ID de travail en cours d'exécution
Mon environnement d'exécution est
- AWS EMR 5.28.1
- Spark 2.4.4
- Noeud maître =
m5.4xlarge
- Nœuds principaux = 3x
rd5.24xlarge
spark.driver.cores=24
spark.driver.memory=32g
spark.executor.memory=21g
spark.scheduler.mode=FIFO
la source
jstack -l
pour obtenir un vidage de thread avec des informations de verrouillage.Réponses:
Si possible, écrivez la sortie des travaux dans AWS Elastic MapReduce hdfs (pour tirer parti des renommages presque instantanés et des meilleures entrées-sorties de fichiers des hdfs locaux) et ajoutez une étape dstcp pour déplacer les fichiers vers S3, pour vous éviter tous les problèmes de gestion des entrailles d'un magasin d'objets essayant d'être un système de fichiers. L'écriture sur des fichiers hdfs locaux vous permettra également d'activer la spéculation pour contrôler les tâches incontrôlables sans tomber dans les interruptions de blocage associées à DirectOutputCommiter.
Si vous devez utiliser S3 comme répertoire de sortie, assurez-vous que les configurations Spark suivantes sont définies
Remarque: DirectParquetOutputCommitter est supprimé de Spark 2.0 en raison du risque de perte de données. Malheureusement, jusqu'à ce que nous ayons amélioré la cohérence de S3a, nous devons travailler avec les solutions de contournement. Les choses s'améliorent avec Hadoop 2.8
Évitez les noms clés dans l'ordre lexicographique. On pourrait utiliser des préfixes de hachage / aléatoire ou inverser la date et l'heure pour se déplacer.L'astuce consiste à nommer vos clés de manière hiérarchique, en plaçant les éléments les plus courants que vous filtrez sur le côté gauche de votre clé. Et jamais de soulignements dans les noms de compartiment en raison de problèmes DNS.
Activation
fs.s3a.fast.upload upload
en parallèle de parties d'un même fichier sur Amazon S3Reportez-vous à ces articles pour plus de détails-
Définition de spark.speculation dans Spark 2.1.0 lors de l'écriture sur s3
https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98
la source
OMI, vous vous approchez probablement mal de ce problème. Sauf si vous pouvez garantir que le nombre de tâches par tâche est très faible, vous n'obtiendrez probablement pas beaucoup d'amélioration des performances en parallélisant des centaines de tâches à la fois. Votre cluster ne peut prendre en charge que 300 tâches à la fois, en supposant que vous utilisez le parallélisme par défaut de 200, soit seulement 1,5 travail. Je suggère de réécrire votre code pour limiter le nombre maximal de requêtes simultanées à 10. Je soupçonne fortement que vous avez 300 requêtes avec une seule tâche de plusieurs centaines en cours d'exécution. La plupart des systèmes de traitement de données OLTP ont intentionnellement un niveau assez faible de requêtes simultanées par rapport aux systèmes RDS plus traditionnels pour cette raison.
aussi
la source