Blocage lorsque de nombreux travaux d'allumage sont planifiés simultanément

17

Utilisation de spark 2.4.4 fonctionnant en mode cluster YARN avec le planificateur spark FIFO.

Je soumets plusieurs opérations de trame de données spark (c'est-à-dire l'écriture de données dans S3) à l'aide d'un exécuteur de pool de threads avec un nombre variable de threads. Cela fonctionne très bien si j'ai ~ 10 threads, mais si j'utilise des centaines de threads, il semble y avoir un blocage, aucun travail n'étant planifié selon l'interface utilisateur Spark.

Quels facteurs contrôlent le nombre de travaux pouvant être planifiés simultanément? Ressources du pilote (par exemple mémoire / cœurs)? D'autres paramètres de configuration d'allumage?

ÉDITER:

Voici un bref synopsis de mon code

ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(pool);

Dataset<Row> aHugeDf = spark.read.json(hundredsOfPaths);

List<Future<Void>> futures = listOfSeveralHundredThings
  .stream()
  .map(aThing -> ecs.submit(() -> {
    df
      .filter(col("some_column").equalTo(aThing))
      .write()
      .format("org.apache.hudi")
      .options(writeOptions)
      .save(outputPathFor(aThing));
    return null;
  }))
  .collect(Collectors.toList());

IntStream.range(0, futures.size()).forEach(i -> ecs.poll(30, TimeUnit.MINUTES));
exec.shutdownNow();

À un moment donné, au fur et à nThreadsmesure que l' augmentation augmente, spark ne semble plus planifier de travaux, comme en témoignent:

  • ecs.poll(...) expirer finalement
  • L'onglet Spark UI jobs ne montrant aucun travail actif
  • L'onglet Spark UI executors ne montrant aucune tâche active pour aucun exécuteur
  • L'onglet SQL Spark UI affichant nThreadsles requêtes en cours d'exécution sans ID de travail en cours d'exécution

Mon environnement d'exécution est

  • AWS EMR 5.28.1
  • Spark 2.4.4
  • Noeud maître = m5.4xlarge
  • Nœuds principaux = 3x rd5.24xlarge
  • spark.driver.cores=24
  • spark.driver.memory=32g
  • spark.executor.memory=21g
  • spark.scheduler.mode=FIFO
Scott
la source
Y a-t-il une section spécifique qui en discute? J'ai relu ces documents plusieurs fois au cours des derniers jours et je n'ai pas trouvé la réponse que je cherchais.
Scott
2
Pouvez-vous s'il vous plaît montrer le code que vous utilisez pour soumettre des travaux Spark via un exécuteur de pool de threads? Il semble que l'impasse se produise avant la soumission du travail Spark.
Salim
1
Pouvez-vous envoyer votre code? Veuillez fournir des détails sur votre env: CPU, RAM; aussi comment créez-vous les fils: simultanément ou en petits groupes de 10?
Saheed
Désolé, comment voulez-vous dire que les travaux ne sont pas planifiés? Ils n'apparaissent pas sur l'interface utilisateur Spark, ou ils apparaissent dans la liste des travaux, mais les tâches ne sont pas exécutées? Dans les deux cas, si vous suspectez un blocage, veuillez exécuter jstack -lpour obtenir un vidage de thread avec des informations de verrouillage.
Daniel Darabos

Réponses:

0

Si possible, écrivez la sortie des travaux dans AWS Elastic MapReduce hdfs (pour tirer parti des renommages presque instantanés et des meilleures entrées-sorties de fichiers des hdfs locaux) et ajoutez une étape dstcp pour déplacer les fichiers vers S3, pour vous éviter tous les problèmes de gestion des entrailles d'un magasin d'objets essayant d'être un système de fichiers. L'écriture sur des fichiers hdfs locaux vous permettra également d'activer la spéculation pour contrôler les tâches incontrôlables sans tomber dans les interruptions de blocage associées à DirectOutputCommiter.

Si vous devez utiliser S3 comme répertoire de sortie, assurez-vous que les configurations Spark suivantes sont définies

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.speculation false

Remarque: DirectParquetOutputCommitter est supprimé de Spark 2.0 en raison du risque de perte de données. Malheureusement, jusqu'à ce que nous ayons amélioré la cohérence de S3a, nous devons travailler avec les solutions de contournement. Les choses s'améliorent avec Hadoop 2.8

Évitez les noms clés dans l'ordre lexicographique. On pourrait utiliser des préfixes de hachage / aléatoire ou inverser la date et l'heure pour se déplacer.L'astuce consiste à nommer vos clés de manière hiérarchique, en plaçant les éléments les plus courants que vous filtrez sur le côté gauche de votre clé. Et jamais de soulignements dans les noms de compartiment en raison de problèmes DNS.

Activation fs.s3a.fast.upload uploaden parallèle de parties d'un même fichier sur Amazon S3

Reportez-vous à ces articles pour plus de détails-

Définition de spark.speculation dans Spark 2.1.0 lors de l'écriture sur s3

https://medium.com/@subhojit20_27731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98

Devesh mehta
la source
AWS a son propre committer docs.aws.amazon.com/emr/latest/ReleaseGuide/…
mazaneicha
0

OMI, vous vous approchez probablement mal de ce problème. Sauf si vous pouvez garantir que le nombre de tâches par tâche est très faible, vous n'obtiendrez probablement pas beaucoup d'amélioration des performances en parallélisant des centaines de tâches à la fois. Votre cluster ne peut prendre en charge que 300 tâches à la fois, en supposant que vous utilisez le parallélisme par défaut de 200, soit seulement 1,5 travail. Je suggère de réécrire votre code pour limiter le nombre maximal de requêtes simultanées à 10. Je soupçonne fortement que vous avez 300 requêtes avec une seule tâche de plusieurs centaines en cours d'exécution. La plupart des systèmes de traitement de données OLTP ont intentionnellement un niveau assez faible de requêtes simultanées par rapport aux systèmes RDS plus traditionnels pour cette raison.

aussi

  1. Apache Hudi a un parallélisme par défaut de plusieurs centaines de FYI.
  2. Pourquoi ne partitionnez-vous pas simplement en fonction de votre colonne de filtre?
Andrew Long
la source