J'essaie de comprendre la relation entre le nombre de cœurs et le nombre d'exécuteurs lors de l'exécution d'un travail Spark sur YARN.
L'environnement de test est le suivant:
- Nombre de nœuds de données: 3
- Spécifications de la machine du nœud de données:
- CPU: Core i7-4790 (nombre de cœurs: 4, nombre de threads: 8)
- Mémoire vive: 32 Go (8 Go x 4)
- Disque dur: 8 To (2 To x 4)
Réseau: 1 Go
Version Spark: 1.0.0
Version Hadoop: 2.4.0 (Hortonworks HDP 2.1)
Flux de travail Spark: sc.textFile -> filtre -> carte -> filtre -> mapToPair -> reductionByKey -> carte -> saveAsTextFile
Des données d'entrée
- Type: fichier texte unique
- Taille: 165 Go
- Nombre de lignes: 454.568.833
Production
- Nombre de lignes après le deuxième filtre: 310640717
- Nombre de lignes du fichier résultat: 99848268
- Taille du fichier résultat: 41 Go
Le travail a été exécuté avec les configurations suivantes:
--master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3
(exécuteurs par nœud de données, utilisez autant que les cœurs)--master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3
(nombre de cœurs réduit)--master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12
(moins de noyau, plus d'exécuteur)
Temps écoulés:
50 min 15 s
55 min 48 s
31 min 23 s
À ma grande surprise, (3) était beaucoup plus rapide.
Je pensais que (1) serait plus rapide, car il y aurait moins de communication entre exécuteurs lors du brassage.
Bien que le nombre de cœurs de (1) soit inférieur à (3), le nombre de cœurs n'est pas le facteur clé car 2) ont bien fonctionné.
(Les éléments suivants ont été ajoutés après la réponse de pwilmot.)
Pour plus d'informations, la capture d'écran du moniteur de performances est la suivante:
- Résumé du nœud de données Ganglia pour (1) - travail commencé à 04:37.
- Résumé du nœud de données Ganglia pour (3) - travail commencé à 19:47. Veuillez ignorer le graphique avant cette heure.
Le graphique se divise approximativement en 2 sections:
- Premièrement: du début à la réductionByKey: intensif en CPU, pas d'activité réseau
- Deuxièmement: après reductionByKey: le processeur diminue, les E / S réseau sont effectuées.
Comme le montre le graphique, (1) peut utiliser autant de puissance CPU qu'il a été donné. Donc, ce n'est peut-être pas le problème du nombre de threads.
Comment expliquer ce résultat?
la source
Réponses:
L'explication a été donnée dans un article du blog de Cloudera, How-to: Tune Your Apache Spark Jobs (Part 2) .
la source
yarn.scheduler.capacity.resource-calculator
désactivé, qui est la valeur par défaut. En effet, par défaut, il planifie par mémoire et non par CPU.Lorsque vous exécutez votre application Spark sur HDFS, selon Sandy Ryza
Je pense donc que votre première configuration est plus lente que la troisième est due à un mauvais débit d'E / S HDFS
la source
Je n'ai pas joué avec ces paramètres moi-même, donc ce n'est que de la spéculation, mais si nous considérons ce problème comme des cœurs et des threads normaux dans un système distribué, vous pouvez utiliser jusqu'à 12 cœurs (machines 4 * 3) et 24 threads dans votre cluster. (8 * 3 machines). Dans vos deux premiers exemples, vous donnez à votre travail un bon nombre de cœurs (espace de calcul potentiel) mais le nombre de threads (travaux) à exécuter sur ces cœurs est si limité que vous ne pouvez pas utiliser une grande partie de la puissance de traitement allouée. et donc le travail est plus lent même s'il y a plus de ressources de calcul allouées.
vous mentionnez que votre préoccupation concernait l'étape de lecture aléatoire - s'il est bien de limiter la surcharge lors de l'étape de lecture aléatoire, il est généralement beaucoup plus important d'utiliser la parallélisation du cluster. Pensez au cas extrême - un programme à thread unique avec zéro shuffle.
la source
Je pense que la réponse ici peut être un peu plus simple que certaines des recommandations ici.
L'indice pour moi est dans le graphe du réseau de cluster. Pour l'exécution 1, l'utilisation est stable à ~ 50 M octets / s. Pour l'exécution 3, l'utilisation régulière est doublée, environ 100 M octets / s.
À partir du billet de blog cloudera partagé par DzOrd , vous pouvez voir cette citation importante:
Alors, faisons quelques calculs pour voir à quelles performances nous nous attendons si c'est vrai.
Exécuter 1:19 Go, 7 cœurs, 3 exécuteurs
Exécuter 3: 4 Go, 2 cœurs, 12 exécuteurs
Si le travail est limité à 100% par la concurrence (le nombre de threads). Nous nous attendrions à ce que l'exécution soit parfaitement corrélée inversement avec le nombre de threads.
Donc
ratio_num_threads ~= inv_ratio_runtime
, et il semble que nous sommes limités par le réseau.Ce même effet explique la différence entre Run 1 et Run 2.
Exécuter 2:19 Go, 4 cœurs, 3 exécuteurs
Comparaison du nombre de threads effectifs et du runtime:
Ce n'est pas aussi parfait que la dernière comparaison, mais nous constatons toujours une baisse similaire des performances lorsque nous perdons des threads.
Maintenant, pour le dernier bit: pourquoi est-il le cas que nous obtenons de meilleures performances avec plus de threads, esp. plus de threads que le nombre de processeurs?
Une bonne explication de la différence entre le parallélisme (ce que nous obtenons en divisant les données sur plusieurs processeurs) et la concurrence (ce que nous obtenons lorsque nous utilisons plusieurs threads pour travailler sur un seul processeur) est fournie dans cet excellent article de Rob Pike: Concurrency n'est pas le parallélisme .
La brève explication est que si un travail Spark interagit avec un système de fichiers ou un réseau, le CPU passe beaucoup de temps à attendre la communication avec ces interfaces et à ne pas passer beaucoup de temps à "faire le travail". En donnant à ces processeurs plus d'une tâche sur laquelle travailler à la fois, ils passent moins de temps à attendre et plus de temps à travailler, et vous obtenez de meilleures performances.
la source
À partir des excellentes ressources disponibles sur la page du package Sparklyr de RStudio :
la source
L'allocation dynamique Spark offre de la flexibilité et alloue les ressources de manière dynamique. Dans ce nombre d'exécuteurs min et max peuvent être donnés. Le nombre d'exécuteurs devant être lancés au démarrage de l'application peut également être indiqué.
Lisez ci-dessous sur le même:
la source
Il y a un petit problème dans les deux premières configurations, je pense. Les concepts de threads et de cœurs comme suit. Le concept de threading est que si les cœurs sont idéaux, utilisez ce noyau pour traiter les données. La mémoire n'est donc pas pleinement utilisée dans les deux premiers cas. Si vous souhaitez comparer cet exemple, choisissez les machines qui ont plus de 10 cœurs sur chaque machine. Ensuite, faites le point de repère.
Mais ne donnez pas plus de 5 cœurs par exécuteur, il y aura un goulot d'étranglement sur les performances d'E / S.
Ainsi, les meilleures machines pour effectuer ce benchmarking pourraient être des nœuds de données qui ont 10 cœurs.
Spécifications de la machine du nœud de données: CPU: Core i7-4790 (nombre de cœurs: 10, nombre de threads: 20) RAM: 32 Go (8 Go x 4) Disque dur: 8 To (2 To x 4)
la source
Je pense que l'une des principales raisons est la localité. La taille de votre fichier d'entrée est de 165G, les blocs associés au fichier sont certainement répartis sur plusieurs DataNodes, plus d'exécuteurs peuvent éviter la copie réseau.
Essayez de définir le nombre de blocs égaux à l'exécuteur, je pense que cela peut être plus rapide.
la source