Comment les étapes sont-elles divisées en tâches dans Spark?

143

Supposons pour ce qui suit qu'un seul travail Spark est en cours d'exécution à chaque instant.

Ce que j'obtiens si loin

Voici ce que je comprends de ce qui se passe dans Spark:

  1. Lors de la création d'un SparkContext, chaque nœud de travail démarre un exécuteur. Les exécuteurs sont des processus séparés (JVM), qui se reconnectent au programme pilote. Chaque exécuteur a le pot du programme pilote. Quitter un pilote, arrête les exécuteurs. Chaque exécuteur peut contenir des partitions.
  2. Lorsqu'un travail est exécuté, un plan d'exécution est créé selon le graphe de lignage.
  3. Le travail d'exécution est divisé en étapes, où des étapes contenant autant de transformations et d'actions voisines (dans le graphe de lignage), mais pas de mélange. Ainsi, les étapes sont séparées par des mélanges.

image 1

je comprends que

  • Une tâche est une commande envoyée du pilote à un exécuteur en sérialisant l'objet Function.
  • L'exécuteur désérialise (avec le pilote jar) la commande (tâche) et l'exécute sur une partition.

mais

Des questions)

Comment diviser la scène en ces tâches?

Plus précisément:

  1. Les tâches sont-elles déterminées par les transformations et les actions ou peuvent-elles être plusieurs transformations / actions dans une tâche?
  2. Les tâches sont-elles déterminées par la partition (par exemple, une tâche par étape par partition).
  3. Les tâches sont-elles déterminées par les nœuds (par exemple, une tâche par étape par nœud)?

Ce que je pense (réponse partielle seulement, même si c'est juste)

Dans https://0x0fff.com/spark-architecture-shuffle , le shuffle est expliqué avec l'image

entrez la description de l'image ici

et j'ai l'impression que la règle est

chaque étape est divisée en tâches # number-of-partitions, sans tenir compte du nombre de nœuds

Pour ma première image, je dirais que j'aurais 3 tâches de carte et 3 tâches de réduction.

Pour l'image de 0x0fff, je dirais qu'il y a 8 tâches de carte et 3 tâches de réduction (en supposant qu'il n'y a que trois fichiers orange et trois fichiers vert foncé).

Questions ouvertes dans tous les cas

Est-ce exact? Mais même si cela est correct, mes questions ci-dessus ne sont pas toutes répondues, car il est toujours ouvert, si plusieurs opérations (par exemple plusieurs cartes) sont dans une tâche ou sont séparées en une tâche par opération.

Ce que disent les autres

Qu'est-ce qu'une tâche dans Spark? Comment le worker Spark exécute-t-il le fichier jar? et Comment le planificateur Apache Spark divise-t-il les fichiers en tâches? sont similaires, mais je n'ai pas l'impression que ma question y trouve une réponse claire.

Marque42
la source

Réponses:

52

Vous avez un joli aperçu ici. Pour répondre à tes questions

  • Un séparé task ne doivent être lancées pour chaque partition de données pour chaque stage. Considérez que chaque partition résidera probablement sur des emplacements physiques distincts - par exemple des blocs dans HDFS ou des répertoires / volumes pour un système de fichiers local.

Notez que la soumission de Stages est pilotée par le DAG Scheduler. Cela signifie que les étapes qui ne sont pas interdépendantes peuvent être soumises au cluster pour une exécution en parallèle: cela maximise la capacité de parallélisation sur le cluster. Donc, si les opérations dans notre flux de données peuvent se produire simultanément, nous nous attendons à voir plusieurs étapes lancées.

Nous pouvons le voir en action dans l'exemple de jouet suivant dans lequel nous effectuons les types d'opérations suivants:

  • charger deux sources de données
  • effectuer des opérations cartographiques sur les deux sources de données séparément
  • rejoins-les
  • effectuer des opérations de cartographie et de filtrage sur le résultat
  • enregistrer le résultat

Alors, avec combien d'étapes allons-nous finir?

  • 1 étape chacun pour charger les deux sources de données en parallèle = 2 étapes
  • Un troisième étage représentant le joinqui dépend des deux autres étages
  • Remarque: toutes les opérations de suivi travaillant sur les données jointes peuvent être effectuées dans la même étape car elles doivent se produire de manière séquentielle. Il n'y a aucun avantage à lancer des étapes supplémentaires car elles ne peuvent pas commencer à travailler tant que l'opération précédente n'est pas terminée.

Voici ce programme de jouets

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

Et voici le DAG du résultat

entrez la description de l'image ici

Maintenant: combien de tâches ? Le nombre de tâches doit être égal à

Somme de ( Stage* #Partitions in the stage)

Javadba
la source
2
Merci! Veuillez préciser votre réponse concernant mon texte: 1) Ma définition des étapes n'est-elle pas exhaustive? Il semble que j'ai manqué l'exigence selon laquelle une scène ne peut pas contenir des opérations qui pourraient être en parallèle. Ou est-ce que ma description implique déjà strictement cela? 2) Le nombre de tâches à exécuter pour le travail est déterminé par le nombre de partitions, mais pas par le nombre de processeurs ou de nœuds, tandis que le nombre de tâches pouvant être exécutées en même temps dépend du nombre de processeurs, non? 3) Une tâche peut contenir plusieurs opérations?
Make42
1
4) Que vouliez-vous dire par votre dernière phrase? Après tout, le nombre de partitions peut varier d'une étape à l'autre. Voulez-vous dire que c'est ainsi que vous avez configuré votre travail pour toutes les étapes?
Make42
@ Make42 Bien sûr, le nombre de partitions peut varier d'une étape à l'autre - vous avez raison. C'était mon intention en disant sum(..)de prendre en compte cette variation.
javadba
wow, votre réponse était tout à fait correcte mais malheureusement, la dernière phrase est définitivement un faux concept. Cela ne signifie pas que le nombre de partitions dans une étape est égal au nombre de processeurs, cependant, vous pouvez définir le nombre de partitions pour un RDD en fonction du nombre de cœurs présentés sur votre machine.
epcpu
@epcpu C'était un cas spécial - mais je suis d'accord que ce serait trompeur, alors je le supprime.
javadba
26

Cela pourrait vous aider à mieux comprendre différentes pièces:

  • Stage: est un ensemble de tâches. Même processus exécuté sur différents sous-ensembles de données (partitions).
  • Tâche: représente une unité de travail sur une partition d'un ensemble de données distribué. Donc, à chaque étape, nombre de tâches = nombre de partitions, ou comme vous l'avez dit "une tâche par étape par partition".
  • Chaque exécuteur fonctionne sur un conteneur de fil, et chaque conteneur réside sur un nœud.
  • Chaque étape utilise plusieurs exécuteurs, chaque exécuteur se voit attribuer plusieurs vcores.
  • Chaque vcore peut exécuter exactement une tâche à la fois
  • Ainsi, à tout moment, plusieurs tâches pourraient être exécutées en parallèle. nombre de tâches en cours d'exécution = nombre de vcores utilisés.
Pedram Bashiri
la source
2
Ceci est une lecture vraiment utile sur l'architecture Spark
pedram bashiri
Je n'ai pas obtenu votre point numéro 3. Autant que je sache, chaque nœud peut avoir plusieurs exécuteurs, donc selon le point 3: Il ne devrait y avoir qu'un seul exécuteur par nœud. Pouvez-vous clarifier ce point?
Rituparno Behera le
@RituparnoBehera chaque nœud peut avoir plusieurs conteneurs et donc plusieurs exécuteurs Spark. Consultez ce lien. docs.cloudera.com/runtime/7.0.2/running-spark-applications/…
pedram bashiri
15

Si je comprends bien, il y a 2 choses (liées) qui vous déroutent:

1) Qu'est-ce qui détermine le contenu d'une tâche?

2) Qu'est-ce qui détermine le nombre de tâches à exécuter?

Le moteur de Spark "colle" des opérations simples sur des rdds consécutifs, par exemple:

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

Ainsi, lorsque rdd3 est (paresseusement) calculé, spark générera une tâche par partition de rdd1 et chaque tâche exécutera à la fois le filtre et la carte par ligne pour aboutir à rdd3.

Le nombre de tâches est déterminé par le nombre de partitions. Chaque RDD a un nombre défini de partitions. Pour un RDD source lu à partir de HDFS (en utilisant sc.textFile (...) par exemple), le nombre de partitions est le nombre de divisions générées par le format d'entrée. Certaines opérations sur RDD (s) peuvent aboutir à un RDD avec un nombre différent de partitions:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

Un autre exemple est les jointures:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

(La plupart) des opérations qui changent le nombre de partitions impliquent un shuffle, quand on fait par exemple:

rdd2 = rdd1.repartition( 1000 ) 

ce qui se passe réellement, c'est que la tâche sur chaque partition de rdd1 doit produire une sortie de fin qui peut être lue par l'étape suivante afin que rdd2 ait exactement 1000 partitions (comment faire? Hash ou tri ). Les tâches de ce côté sont parfois appelées «tâches de mappage (côté)». Une tâche qui s'exécutera plus tard sur rdd2 agira sur une partition (de rdd2!) Et devra trouver comment lire / combiner les sorties côté carte pertinentes pour cette partition. Les tâches de ce côté sont parfois appelées "Réduire les tâches (secondaires)".

Les 2 questions sont liées: le nombre de tâches dans une étape est le nombre de partitions (commun aux rdds consécutifs "collés" ensemble) et le nombre de partitions d'un rdd peut changer entre les étapes (en spécifiant le nombre de partitions à certains shuffle provoquant l'opération par exemple).

Une fois que l'exécution d'une étape a commencé, ses tâches peuvent occuper des emplacements de tâches. Le nombre d'emplacements de tâches simultanés est de numExecutors * ExecutorCores. En général, ceux-ci peuvent être occupés par des tâches d'étapes différentes et non dépendantes.

Harel Gliksman
la source