Performances Spark pour Scala vs Python

183

Je préfère Python à Scala. Mais, comme Spark est écrit nativement dans Scala, je m'attendais à ce que mon code s'exécute plus rapidement dans la version Scala que dans la version Python pour des raisons évidentes.

Avec cette hypothèse, j'ai pensé apprendre et écrire la version Scala d'un code de prétraitement très courant pour environ 1 Go de données. Les données sont tirées du concours SpringLeaf sur Kaggle . Juste pour donner un aperçu des données (il contient 1936 dimensions et 145232 lignes). Les données sont composées de différents types, par exemple int, float, string, boolean. J'utilise 6 cœurs sur 8 pour le traitement Spark; c'est pourquoi je l'ai utilisé minPartitions=6pour que chaque noyau ait quelque chose à traiter.

Code Scala

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Code Python

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Scala Performance Stage 0 (38 min), Stage 1 (18 sec) entrez la description de l'image ici

Python Performance Stage 0 (11 min), Stage 1 (7 sec) entrez la description de l'image ici

Les deux produisent des graphiques de visualisation DAG différents (grâce auxquels les deux images montrent différentes fonctions de stade 0 pour Scala ( map) et Python ( reduceByKey))

Mais, essentiellement les deux codes essaient de transformer les données en (dimension_id, chaîne de la liste de valeurs) RDD et de les enregistrer sur le disque. La sortie sera utilisée pour calculer diverses statistiques pour chaque dimension.

En termes de performances, le code Scala pour ces données réelles comme celle-ci semble fonctionner 4 fois plus lentement que la version Python. La bonne nouvelle pour moi est que cela m'a motivé à rester avec Python. La mauvaise nouvelle est que je n'ai pas bien compris pourquoi?

Mrityunjay
la source
8
Peut-être que cela dépend du code et de l'application car j'obtiens l'autre résultat, qu'apache spark python est plus lent que scala, lors de la sommation d'un milliard de termes de la formule de Leibniz pour π
Paul
4
Question interessante! Btw, regardez aussi ici: emptypipes.org/2015/01/17/python-vs-scala-vs-spark Plus vous avez de cœurs, moins vous pouvez voir les différences entre les langues.
Markon
Avez-vous envisagé d' accepter la réponse existante?
10465355 dit Réintégrer Monica le

Réponses:

368

La réponse originale concernant le code se trouve ci-dessous.


Tout d'abord, vous devez faire la distinction entre différents types d'API, chacun ayant ses propres considérations de performances.

API RDD

(structures Python pures avec orchestration basée sur JVM)

C'est le composant qui sera le plus affecté par les performances du code Python et les détails de l'implémentation de PySpark. Bien qu'il soit peu probable que les performances de Python soient un problème, il y a au moins quelques facteurs à prendre en compte:

  • Surcharge de la communication JVM. Pratiquement toutes les données qui arrivent et qui proviennent de l'exécuteur Python doivent être passées via un socket et un worker JVM. Bien qu'il s'agisse d'une communication locale relativement efficace, elle n'est toujours pas gratuite.
  • Exécuteurs basés sur des processus (Python) ou exécuteurs basés sur des threads (JVM uniques et plusieurs threads) (Scala). Chaque exécuteur Python s'exécute dans son propre processus. En tant qu'effet secondaire, il fournit une isolation plus forte que son homologue JVM et un certain contrôle sur le cycle de vie de l'exécuteur, mais une utilisation de la mémoire potentiellement beaucoup plus élevée:

    • empreinte mémoire de l'interpréteur
    • encombrement des bibliothèques chargées
    • diffusion moins efficace (chaque processus nécessite sa propre copie d'une diffusion)
  • Performances du code Python lui-même. De manière générale, Scala est plus rapide que Python mais cela varie d'une tâche à l'autre. De plus, vous avez plusieurs options, y compris des JIT comme Numba , des extensions C ( Cython ) ou des bibliothèques spécialisées comme Theano . Enfin, si vous n'utilisez pas ML / MLlib (ou simplement la pile NumPy) , envisagez d'utiliser PyPy comme interprète alternatif. Voir SPARK-3094 .

  • La configuration de PySpark fournit l' spark.python.worker.reuseoption qui peut être utilisée pour choisir entre forger un processus Python pour chaque tâche et réutiliser un processus existant. La dernière option semble utile pour éviter un ramassage des ordures coûteux (c'est plus une impression qu'un résultat de tests systématiques), tandis que la première (par défaut) est optimale en cas de diffusion et d'importations coûteuses.
  • Le comptage de références, utilisé comme méthode de récupération de place de première ligne dans CPython, fonctionne assez bien avec les charges de travail Spark typiques (traitement de type flux, pas de cycles de référence) et réduit le risque de longues pauses GC.

MLlib

(exécution mixte Python et JVM)

Les considérations de base sont à peu près les mêmes qu'avant avec quelques problèmes supplémentaires. Alors que les structures de base utilisées avec MLlib sont des objets RDD Python simples, tous les algorithmes sont exécutés directement à l'aide de Scala.

Cela signifie un coût supplémentaire de conversion des objets Python en objets Scala et inversement, une utilisation accrue de la mémoire et certaines limitations supplémentaires que nous aborderons plus tard.

À partir de maintenant (Spark 2.x), l'API basée sur RDD est en mode maintenance et devrait être supprimée dans Spark 3.0 .

API DataFrame et Spark ML

(Exécution JVM avec code Python limité au pilote)

Ce sont probablement le meilleur choix pour les tâches de traitement de données standard. Étant donné que le code Python est principalement limité aux opérations logiques de haut niveau sur le pilote, il ne devrait y avoir aucune différence de performances entre Python et Scala.

Une seule exception est l'utilisation des UDF Python par ligne qui sont nettement moins efficaces que leurs équivalents Scala. Bien qu'il existe des possibilités d'amélioration (il y a eu un développement substantiel dans Spark 2.0.0), la plus grande limitation est un aller-retour complet entre la représentation interne (JVM) et l'interpréteur Python. Si possible, vous devez privilégier une composition d'expressions intégrées ( exemple . Le comportement Python UDF a été amélioré dans Spark 2.0.0, mais il est toujours sous-optimal par rapport à l'exécution native.

Cela pourrait s'améliorer à l'avenir s'est considérablement amélioré avec l'introduction des UDF vectorisés (SPARK-21190 et autres extensions) , qui utilise Arrow Streaming pour un échange de données efficace avec une désérialisation sans copie. Pour la plupart des applications, leurs frais généraux secondaires peuvent être simplement ignorés.

Veillez également à éviter de transmettre des données inutiles entre DataFrameset RDDs. Cela nécessite une sérialisation et une désérialisation coûteuses, sans parler du transfert de données vers et depuis l'interpréteur Python.

Il est à noter que les appels Py4J ont une latence assez élevée. Cela inclut des appels simples comme:

from pyspark.sql.functions import col

col("foo")

Habituellement, cela ne devrait pas avoir d'importance (la surcharge est constante et ne dépend pas de la quantité de données) mais dans le cas des applications soft en temps réel, vous pouvez envisager de mettre en cache / réutiliser les wrappers Java.

Ensembles de données GraphX ​​et Spark

Pour le moment (Spark 1.6 2.1) ni l'un ni l'autre ne fournit l'API PySpark, vous pouvez donc dire que PySpark est infiniment pire que Scala.

GraphX

Dans la pratique, le développement de GraphX ​​s'est arrêté presque complètement et le projet est actuellement en mode maintenance avec les tickets JIRA associés fermés car cela ne résoudra pas . La bibliothèque GraphFrames fournit une bibliothèque alternative de traitement de graphes avec des liaisons Python.

Base de données

Subjectivement parlant, il n'y a pas beaucoup de place pour le typage statique Datasetsen Python et même s'il y avait l'implémentation actuelle de Scala, c'est trop simpliste et n'offre pas les mêmes avantages de performances que DataFrame.

Diffusion

D'après ce que j'ai vu jusqu'à présent, je recommanderais fortement d'utiliser Scala sur Python. Cela pourrait changer à l'avenir si PySpark prend en charge les flux structurés, mais pour le moment, l'API Scala semble être beaucoup plus robuste, complète et efficace. Mon expérience est assez limitée.

Le streaming structuré dans Spark 2.x semble réduire l'écart entre les langues, mais pour l'instant, il en est encore à ses débuts. Néanmoins, l'API basée sur RDD est déjà référencée comme «flux hérité» dans la documentation Databricks (date d'accès 2017-03-03)), il est donc raisonnable de s'attendre à d'autres efforts d'unification.

Considérations de non-performance

Parité des fonctionnalités

Toutes les fonctionnalités de Spark ne sont pas exposées via l'API PySpark. Assurez-vous de vérifier si les pièces dont vous avez besoin sont déjà implémentées et essayez de comprendre les limites possibles.

Cela est particulièrement important lorsque vous utilisez MLlib et des contextes mixtes similaires (voir Appel d'une fonction Java / Scala à partir d'une tâche ). Pour être honnête, certaines parties de l'API PySpark, comme mllib.linalg, fournissent un ensemble de méthodes plus complet que Scala.

Conception d'API

L'API PySpark reflète étroitement son homologue Scala et, en tant que telle, n'est pas exactement Pythonic. Cela signifie qu'il est assez facile de mapper entre les langues, mais en même temps, le code Python peut être beaucoup plus difficile à comprendre.

Architecture complexe

Le flux de données PySpark est relativement complexe par rapport à une exécution JVM pure. Il est beaucoup plus difficile de raisonner sur les programmes PySpark ou sur le débogage. De plus, au moins une compréhension de base de Scala et de la JVM en général est à peu près indispensable.

Spark 2.x et au-delà

La transition en cours vers l' DatasetAPI, avec l'API RDD gelée, offre à la fois des opportunités et des défis aux utilisateurs de Python. Bien que les parties de haut niveau de l'API soient beaucoup plus faciles à exposer en Python, les fonctionnalités les plus avancées sont pratiquement impossibles à utiliser directement .

De plus, les fonctions natives de Python continuent d'être des citoyens de deuxième classe dans le monde SQL. Espérons que cela s'améliorera à l'avenir avec la sérialisation d'Apache Arrow ( les efforts actuels visent les donnéescollection mais le serde UDF est un objectif à long terme ).

Pour les projets fortement dépendant de la base de code Python, des alternatives pures en Python (comme Dask ou Ray ) pourraient être une alternative intéressante.

Il n'est pas nécessaire que ce soit l'un contre l'autre

L'API Spark DataFrame (SQL, Dataset) fournit un moyen élégant d'intégrer du code Scala / Java dans l'application PySpark. Vous pouvez utiliser DataFramespour exposer des données à un code JVM natif et relire les résultats. J'ai expliqué quelques options ailleurs et vous pouvez trouver un exemple fonctionnel de l'aller-retour Python-Scala dans Comment utiliser une classe Scala dans Pyspark .

Il peut être encore amélioré en introduisant les types définis par l'utilisateur (voir Comment définir un schéma pour un type personnalisé dans Spark SQL? ).


Quel est le problème avec le code fourni dans la question

(Clause de non-responsabilité: point de vue de Pythonista. J'ai probablement manqué quelques astuces de Scala)

Tout d'abord, il y a une partie de votre code qui n'a aucun sens. Si vous avez déjà des (key, value)paires créées à l'aide de zipWithIndexou enumeratequel est l'intérêt de créer une chaîne juste pour la diviser juste après? flatMapne fonctionne pas de manière récursive, vous pouvez donc simplement produire des tuples et ignorer mapquoi que ce soit.

Une autre partie que je trouve problématique est reduceByKey. De manière générale, reduceByKeyest utile si l'application de la fonction d'agrégation peut réduire la quantité de données à mélanger. Puisque vous concaténez simplement des chaînes, il n'y a rien à gagner ici. En ignorant les éléments de bas niveau, comme le nombre de références, la quantité de données que vous devez transférer est exactement la même que pour groupByKey.

Normalement, je ne m'attarderais pas là-dessus, mais pour autant que je sache, c'est un goulot d'étranglement dans votre code Scala. Joindre des chaînes sur JVM est une opération assez coûteuse (voir par exemple: La concaténation de chaînes dans scala est-elle aussi coûteuse qu'en Java? ). Cela signifie que quelque chose comme celui-ci _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) qui est équivalent à input4.reduceByKey(valsConcat)dans votre code n'est pas une bonne idée.

Si vous voulez éviter, groupByKeyvous pouvez essayer d'utiliser aggregateByKeyavec StringBuilder. Quelque chose de similaire devrait faire l'affaire:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

mais je doute que cela en vaille la peine.

En gardant à l'esprit ce qui précède, j'ai réécrit votre code comme suit:

Scala :

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

Python :

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

Résultats

En local[6]mode (Intel (R) Xeon (R) CPU E3-1245 V2 @ 3,40 GHz) avec 4 Go de mémoire par exécuteur, il faut (n = 3):

  • Scala - moyenne: 250,00 s, stdev: 12,49
  • Python - moyenne: 246,66 s, stdev: 1,15

Je suis presque sûr que la plupart de ce temps est consacré à la lecture aléatoire, à la sérialisation, à la désérialisation et à d'autres tâches secondaires. Juste pour le plaisir, voici du code naïf à un seul thread en Python qui effectue la même tâche sur cette machine en moins d'une minute:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])
zéro323
la source
26
L'une des réponses les plus claires, complètes et utiles que j'ai rencontrées depuis un certain temps. Merci!
etov
Quel mec superbe tu es!
DennisLi
Est-ce la même tâche? Le dernier zip n'est-il pas paresseux et il n'y a pas d'enregistrement dans un fichier?
Dror Speiser le
-5

Extension aux réponses ci-dessus -

Scala s'avère plus rapide à bien des égards par rapport à python, mais il y a des raisons valables pour lesquelles python devient plus populaire que scala, voyons quelques-unes d'entre elles -

Python pour Apache Spark est assez facile à apprendre et à utiliser. Cependant, ce n'est pas la seule raison pour laquelle Pyspark est un meilleur choix que Scala. Il y a plus.

L'API Python pour Spark peut être plus lente sur le cluster, mais à la fin, les scientifiques des données peuvent en faire beaucoup plus par rapport à Scala. La complexité de Scala est absente. L'interface est simple et complète.

Parler de la lisibilité du code, de la maintenance et de la familiarité avec l'API Python pour Apache Spark est bien meilleur que Scala.

Python est livré avec plusieurs bibliothèques liées à l'apprentissage automatique et au traitement du langage naturel. Cela facilite l'analyse des données et a également des statistiques qui sont beaucoup plus matures et éprouvées par le temps. Par exemple, numpy, pandas, scikit-learn, seaborn et matplotlib.

Remarque: la plupart des data scientists utilisent une approche hybride dans laquelle ils utilisent le meilleur des deux API.

Enfin, la communauté Scala s'avère souvent beaucoup moins utile aux programmeurs. Cela fait de Python un apprentissage très précieux. Si vous avez suffisamment d'expérience avec un langage de programmation à typage statique comme Java, vous pouvez cesser de vous soucier de ne pas utiliser Scala complètement.


la source