Apache Spark: map vs mapPartitions?

133

Quelle est la différence entre un RDD map et une mapPartitionsméthode? Et flatMapse comporte comme mapou comme mapPartitions? Merci.

(modifier) ​​c'est-à-dire quelle est la différence (soit sémantiquement, soit en termes d'exécution) entre

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

Et:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
Nicolas White
la source
3
Après avoir lu la réponse ci-dessous, vous pouvez jeter un œil à [cette expérience] partagée par quelqu'un qui l'a réellement utilisée. ( Bzhangusc.wordpress.com/2014/06/19/… ) bzhangusc.wordpress.com/2014/06/19 /…
Abhidemon

Réponses:

121

Quelle est la différence entre la mappe d'un RDD et la méthode mapPartitions?

La méthode map convertit chaque élément du RDD source en un seul élément du RDD résultat en appliquant une fonction. mapPartitions convertit chaque partition du RDD source en plusieurs éléments du résultat (éventuellement aucun).

Et flatMap se comporte-t-il comme map ou comme mapPartitions?

Ni l'un ni l'autre, flatMap ne fonctionne sur un seul élément (as map) et produit plusieurs éléments du résultat (as mapPartitions).

Alexey Romanov
la source
3
Merci - la carte provoque-t-elle des mélanges (ou change-t-elle le nombre de partitions)? Est-ce qu'il déplace les données entre les nœuds? J'ai utilisé mapPartitions pour éviter de déplacer des données entre les nœuds, mais je ne savais pas si flapMap le ferait.
Nicholas White
Si vous regardez la source - github.com/apache/incubator-spark/blob/… et github.com/apache/incubator-spark/blob/… - les deux mapet flatMapont exactement les mêmes partitions que le parent.
Alexey Romanov
13
À titre de note, une présentation fournie par un conférencier au Sommet Spark de San Francisco 2013 (goo.gl/JZXDCR) souligne que les tâches avec une surcharge par enregistrement élevée fonctionnent mieux avec une mapPartition qu'avec une transformation de carte. Ceci est, selon la présentation, dû au coût élevé de mise en place d'une nouvelle tâche.
Mikel Urkia
1
Je vois le contraire - même avec de très petites opérations, il est plus rapide d'appeler mapPartitions et itérer que d'appeler map. Je suppose que ce n'est que la surcharge du démarrage du moteur de langage qui traitera la tâche de carte. (Je suis dans R, qui peut avoir plus de surcharge de démarrage.) Si vous effectuez plusieurs opérations, alors mapPartitions semble être un peu plus rapide - je suppose que c'est parce qu'il ne lit le RDD qu'une seule fois. Même si le RDD est mis en cache dans la RAM, cela économise beaucoup de temps système de la conversion de type.
Bob
3
mapprend essentiellement votre fonction f, et la transmet dans iter.map(f). Donc, fondamentalement, c'est une méthode pratique qui s'enroule mapPartitions. Je serais surpris s'il y avait un avantage de performance de toute façon pour un travail de transformation de style de carte pure (c'est-à-dire où la fonction est identique), si vous avez besoin de créer des objets pour le traitement, si ces objets peuvent être partagés, ce mapPartitionsserait avantageux.
NightWolf
129

Lutin. POINTE :

Chaque fois que vous avez une initialisation lourde qui devrait être effectuée une fois pour de nombreux RDDéléments plutôt qu'une fois par RDDélément, et si cette initialisation, telle que la création d'objets à partir d'une bibliothèque tierce, ne peut pas être sérialisée (afin que Spark puisse la transmettre à travers le cluster à les nœuds worker), utilisez à la mapPartitions()place de map(). mapPartitions()prévoit que l'initialisation doit être effectuée une fois par tâche de travail / thread / partition au lieu d'une fois par RDDélément de données par exemple: voir ci-dessous.

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2. ne flatMapse comporte comme une carte ou comme mapPartitions?

Oui. s'il vous plaît voir l'exemple 2 de flatmap.. son explicite.

Q1. Quelle est la différence entre un RDD mapetmapPartitions

mapexécute la fonction utilisée au niveau de chaque élément tout en l' mapPartitionsexerçant au niveau de la partition.

Exemple de scénario : si nous avons 100K éléments dans uneRDDpartitionparticulière,nous déclencherons la fonction utilisée par la transformation de mappage 100K fois lorsque nous l'utilisonsmap.

Inversement, si nous utilisons, mapPartitionsnous n'appellerons la fonction particulière qu'une seule fois, mais nous passerons tous les enregistrements de 100K et récupérerons toutes les réponses en un seul appel de fonction.

Il y aura un gain de performance puisque maptravaille sur une fonction particulière tant de fois, surtout si la fonction fait quelque chose de cher à chaque fois qu'elle n'aurait pas besoin de faire si nous passions tous les éléments à la fois (dans le cas de mappartitions).

carte

Applique une fonction de transformation à chaque élément du RDD et renvoie le résultat sous forme de nouveau RDD.

Liste des variantes

Def map [U: ClassTag] (f: T => U): RDD [U]

Exemple :

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

Il s'agit d'une carte spécialisée qui n'est appelée qu'une seule fois pour chaque partition. L'ensemble du contenu des partitions respectives est disponible sous forme de flux séquentiel de valeurs via l'argument d'entrée (Iterarator [T]). La fonction personnalisée doit renvoyer un autre Iterator [U]. Les itérateurs de résultats combinés sont automatiquement convertis en un nouveau RDD. Veuillez noter que les tuples (3,4) et (6,7) sont absents du résultat suivant en raison du partitionnement que nous avons choisi.

preservesPartitioningindique si la fonction d'entrée préserve le partitionneur, ce qui devrait l'être falsesauf s'il s'agit d'une paire RDD et que la fonction d'entrée ne modifie pas les touches.

Liste des variantes

def mapPartitions [U: ClassTag] (f: Iterator [T] => Iterator [U], préserve le partitionnement: Boolean = false): RDD [U]

Exemple 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

Exemple 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

Le programme ci-dessus peut également être écrit en utilisant flatMap comme suit.

Exemple 2 avec flatmap

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

Conclusion :

mapPartitionsla transformation est plus rapide que mappuisqu'elle appelle votre fonction une fois / partition, pas une fois / élément.

Lectures complémentaires: foreach Vs foreachPartitions Quand utiliser Quoi?

Ram Ghadiyaram
la source
4
Je sais que vous pouvez utiliser mapou mapPartitionsobtenir le même résultat (voir les deux exemples dans la question); cette question est de savoir pourquoi vous choisiriez une voie plutôt qu'une autre. Les commentaires dans l'autre réponse sont vraiment utiles! De plus, vous ne l'avez pas mentionné mapet flatMappassé falseà preservesPartitioning, et quelles sont les implications de cela.
Nicholas White
2
la fonction exécutée à chaque fois par rapport à la fonction exécutée une fois pour la partition était le lien qui me manquait. Avoir accès à plus d'un enregistrement de données à la fois avec mapPartition est une chose inestimable. apprécier la réponse
Points
1
Y a-t-il un scénario où mapvaut mieux que mapPartitions? Si mapPartitionsc'est si bon, pourquoi n'est-ce pas l'implémentation de carte par défaut?
ruhong
1
@oneleggedmule: les deux sont pour des exigences différentes que nous devons utiliser à bon escient si vous instanciez des ressources comme les connexions db (comme indiqué dans l'exemple ci-dessus) qui sont coûteuses, alors les mappartitions sont la bonne approche puisqu'une connexion par partition. également saveAsTextFile mappartitions utilisées en interne voir
Ram Ghadiyaram
@oneleggedmule De mon point de vue, map () est plus facile à comprendre et à apprendre, et c'est aussi une méthode courante dans de nombreuses langues différentes. Il peut être plus facile à utiliser que mapPartitions () si quelqu'un n'est pas familier avec cette méthode spécifique à Spark au début. S'il n'y a pas de différence de performances, je préfère utiliser map ().
Raymond Chen
15

Carte :

  1. Il traite une ligne à la fois, très similaire à la méthode map () de MapReduce.
  2. Vous revenez de la transformation après chaque ligne.

MapPartitions

  1. Il traite la partition complète en une seule fois.
  2. Vous ne pouvez revenir de la fonction qu'une seule fois après avoir traité toute la partition.
  3. Tous les résultats intermédiaires doivent être conservés en mémoire jusqu'à ce que vous traitiez toute la partition.
  4. Vous fournit la fonction setup () map () et cleanup () de MapReduce

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

KrazyGautam
la source
concernant 2 - si vous effectuez des transformations d'itérateur en itérateur et que vous ne matérialisez pas l'itérateur en une collection quelconque, vous n'aurez pas à garder la partition entière en mémoire, en fait, de cette façon, spark pourra renverser des parties de la partition sur le disque.
ilcord
4
Vous n'êtes pas obligé de conserver la partition entière en mémoire, mais le résultat. Vous ne pouvez pas retourner le résultat tant que vous n'avez pas traité toute la partition
KrazyGautam