Je préfère Python à Scala. Mais, comme Spark est écrit nativement dans Scala, je m'attendais à ce que mon code s'exécute plus rapidement dans la version Scala que dans la version Python pour des raisons évidentes.
Avec cette hypothèse, j'ai pensé apprendre et écrire la version Scala d'un code de prétraitement très courant pour environ 1 Go de données. Les données sont tirées du concours SpringLeaf sur Kaggle . Juste pour donner un aperçu des données (il contient 1936 dimensions et 145232 lignes). Les données sont composées de différents types, par exemple int, float, string, boolean. J'utilise 6 cœurs sur 8 pour le traitement Spark; c'est pourquoi je l'ai utilisé minPartitions=6
pour que chaque noyau ait quelque chose à traiter.
Code Scala
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}
val input4 = input3.map(toKeyVal)
def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}
val input5 = input4.reduceByKey(valsConcat)
input5.saveAsTextFile("output")
Code Python
input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
input2 = input.mapPartitionsWithIndex(drop_first_line)
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2
input3 = input2.flatMap(separate_cols)
def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)
def vals_concat(v1, v2):
return v1 + ',' + v2
input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')
Scala Performance Stage 0 (38 min), Stage 1 (18 sec)
Python Performance Stage 0 (11 min), Stage 1 (7 sec)
Les deux produisent des graphiques de visualisation DAG différents (grâce auxquels les deux images montrent différentes fonctions de stade 0 pour Scala ( map
) et Python ( reduceByKey
))
Mais, essentiellement les deux codes essaient de transformer les données en (dimension_id, chaîne de la liste de valeurs) RDD et de les enregistrer sur le disque. La sortie sera utilisée pour calculer diverses statistiques pour chaque dimension.
En termes de performances, le code Scala pour ces données réelles comme celle-ci semble fonctionner 4 fois plus lentement que la version Python. La bonne nouvelle pour moi est que cela m'a motivé à rester avec Python. La mauvaise nouvelle est que je n'ai pas bien compris pourquoi?
la source
Réponses:
La réponse originale concernant le code se trouve ci-dessous.
Tout d'abord, vous devez faire la distinction entre différents types d'API, chacun ayant ses propres considérations de performances.
API RDD
(structures Python pures avec orchestration basée sur JVM)
C'est le composant qui sera le plus affecté par les performances du code Python et les détails de l'implémentation de PySpark. Bien qu'il soit peu probable que les performances de Python soient un problème, il y a au moins quelques facteurs à prendre en compte:
Exécuteurs basés sur des processus (Python) ou exécuteurs basés sur des threads (JVM uniques et plusieurs threads) (Scala). Chaque exécuteur Python s'exécute dans son propre processus. En tant qu'effet secondaire, il fournit une isolation plus forte que son homologue JVM et un certain contrôle sur le cycle de vie de l'exécuteur, mais une utilisation de la mémoire potentiellement beaucoup plus élevée:
Performances du code Python lui-même. De manière générale, Scala est plus rapide que Python mais cela varie d'une tâche à l'autre. De plus, vous avez plusieurs options, y compris des JIT comme Numba , des extensions C ( Cython ) ou des bibliothèques spécialisées comme Theano . Enfin,
si vous n'utilisez pas ML / MLlib (ou simplement la pile NumPy), envisagez d'utiliser PyPy comme interprète alternatif. Voir SPARK-3094 .spark.python.worker.reuse
option qui peut être utilisée pour choisir entre forger un processus Python pour chaque tâche et réutiliser un processus existant. La dernière option semble utile pour éviter un ramassage des ordures coûteux (c'est plus une impression qu'un résultat de tests systématiques), tandis que la première (par défaut) est optimale en cas de diffusion et d'importations coûteuses.MLlib
(exécution mixte Python et JVM)
Les considérations de base sont à peu près les mêmes qu'avant avec quelques problèmes supplémentaires. Alors que les structures de base utilisées avec MLlib sont des objets RDD Python simples, tous les algorithmes sont exécutés directement à l'aide de Scala.
Cela signifie un coût supplémentaire de conversion des objets Python en objets Scala et inversement, une utilisation accrue de la mémoire et certaines limitations supplémentaires que nous aborderons plus tard.
À partir de maintenant (Spark 2.x), l'API basée sur RDD est en mode maintenance et devrait être supprimée dans Spark 3.0 .
API DataFrame et Spark ML
(Exécution JVM avec code Python limité au pilote)
Ce sont probablement le meilleur choix pour les tâches de traitement de données standard. Étant donné que le code Python est principalement limité aux opérations logiques de haut niveau sur le pilote, il ne devrait y avoir aucune différence de performances entre Python et Scala.
Une seule exception est l'utilisation des UDF Python par ligne qui sont nettement moins efficaces que leurs équivalents Scala. Bien qu'il existe des possibilités d'amélioration (il y a eu un développement substantiel dans Spark 2.0.0), la plus grande limitation est un aller-retour complet entre la représentation interne (JVM) et l'interpréteur Python. Si possible, vous devez privilégier une composition d'expressions intégrées ( exemple . Le comportement Python UDF a été amélioré dans Spark 2.0.0, mais il est toujours sous-optimal par rapport à l'exécution native.
Cela
pourrait s'améliorer à l'avenirs'est considérablement amélioré avec l'introduction des UDF vectorisés (SPARK-21190 et autres extensions) , qui utilise Arrow Streaming pour un échange de données efficace avec une désérialisation sans copie. Pour la plupart des applications, leurs frais généraux secondaires peuvent être simplement ignorés.Veillez également à éviter de transmettre des données inutiles entre
DataFrames
etRDDs
. Cela nécessite une sérialisation et une désérialisation coûteuses, sans parler du transfert de données vers et depuis l'interpréteur Python.Il est à noter que les appels Py4J ont une latence assez élevée. Cela inclut des appels simples comme:
from pyspark.sql.functions import col col("foo")
Habituellement, cela ne devrait pas avoir d'importance (la surcharge est constante et ne dépend pas de la quantité de données) mais dans le cas des applications soft en temps réel, vous pouvez envisager de mettre en cache / réutiliser les wrappers Java.
Ensembles de données GraphX et Spark
Pour le moment (Spark
GraphX1.62.1) ni l'un ni l'autre ne fournit l'API PySpark, vous pouvez donc dire que PySpark est infiniment pire que Scala.Dans la pratique, le développement de GraphX s'est arrêté presque complètement et le projet est actuellement en mode maintenance avec les tickets JIRA associés fermés car cela ne résoudra pas . La bibliothèque GraphFrames fournit une bibliothèque alternative de traitement de graphes avec des liaisons Python.
Base de donnéesSubjectivement parlant, il n'y a pas beaucoup de place pour le typage statique
Datasets
en Python et même s'il y avait l'implémentation actuelle de Scala, c'est trop simpliste et n'offre pas les mêmes avantages de performances queDataFrame
.Diffusion
D'après ce que j'ai vu jusqu'à présent, je recommanderais fortement d'utiliser Scala sur Python. Cela pourrait changer à l'avenir si PySpark prend en charge les flux structurés, mais pour le moment, l'API Scala semble être beaucoup plus robuste, complète et efficace. Mon expérience est assez limitée.
Le streaming structuré dans Spark 2.x semble réduire l'écart entre les langues, mais pour l'instant, il en est encore à ses débuts. Néanmoins, l'API basée sur RDD est déjà référencée comme «flux hérité» dans la documentation Databricks (date d'accès 2017-03-03)), il est donc raisonnable de s'attendre à d'autres efforts d'unification.
Considérations de non-performance
Parité des fonctionnalitésToutes les fonctionnalités de Spark ne sont pas exposées via l'API PySpark. Assurez-vous de vérifier si les pièces dont vous avez besoin sont déjà implémentées et essayez de comprendre les limites possibles.
Cela est particulièrement important lorsque vous utilisez MLlib et des contextes mixtes similaires (voir Appel d'une fonction Java / Scala à partir d'une tâche ). Pour être honnête, certaines parties de l'API PySpark, comme
Conception d'APImllib.linalg
, fournissent un ensemble de méthodes plus complet que Scala.L'API PySpark reflète étroitement son homologue Scala et, en tant que telle, n'est pas exactement Pythonic. Cela signifie qu'il est assez facile de mapper entre les langues, mais en même temps, le code Python peut être beaucoup plus difficile à comprendre.
Architecture complexeLe flux de données PySpark est relativement complexe par rapport à une exécution JVM pure. Il est beaucoup plus difficile de raisonner sur les programmes PySpark ou sur le débogage. De plus, au moins une compréhension de base de Scala et de la JVM en général est à peu près indispensable.
Spark 2.x et au-delàLa transition en cours vers l'
Dataset
API, avec l'API RDD gelée, offre à la fois des opportunités et des défis aux utilisateurs de Python. Bien que les parties de haut niveau de l'API soient beaucoup plus faciles à exposer en Python, les fonctionnalités les plus avancées sont pratiquement impossibles à utiliser directement .De plus, les fonctions natives de Python continuent d'être des citoyens de deuxième classe dans le monde SQL. Espérons que cela s'améliorera à l'avenir avec la sérialisation d'Apache Arrow ( les efforts actuels visent les données
collection
mais le serde UDF est un objectif à long terme ).Pour les projets fortement dépendant de la base de code Python, des alternatives pures en Python (comme Dask ou Ray ) pourraient être une alternative intéressante.
Il n'est pas nécessaire que ce soit l'un contre l'autre
L'API Spark DataFrame (SQL, Dataset) fournit un moyen élégant d'intégrer du code Scala / Java dans l'application PySpark. Vous pouvez utiliser
DataFrames
pour exposer des données à un code JVM natif et relire les résultats. J'ai expliqué quelques options ailleurs et vous pouvez trouver un exemple fonctionnel de l'aller-retour Python-Scala dans Comment utiliser une classe Scala dans Pyspark .Il peut être encore amélioré en introduisant les types définis par l'utilisateur (voir Comment définir un schéma pour un type personnalisé dans Spark SQL? ).
Quel est le problème avec le code fourni dans la question
(Clause de non-responsabilité: point de vue de Pythonista. J'ai probablement manqué quelques astuces de Scala)
Tout d'abord, il y a une partie de votre code qui n'a aucun sens. Si vous avez déjà des
(key, value)
paires créées à l'aide dezipWithIndex
ouenumerate
quel est l'intérêt de créer une chaîne juste pour la diviser juste après?flatMap
ne fonctionne pas de manière récursive, vous pouvez donc simplement produire des tuples et ignorermap
quoi que ce soit.Une autre partie que je trouve problématique est
reduceByKey
. De manière générale,reduceByKey
est utile si l'application de la fonction d'agrégation peut réduire la quantité de données à mélanger. Puisque vous concaténez simplement des chaînes, il n'y a rien à gagner ici. En ignorant les éléments de bas niveau, comme le nombre de références, la quantité de données que vous devez transférer est exactement la même que pourgroupByKey
.Normalement, je ne m'attarderais pas là-dessus, mais pour autant que je sache, c'est un goulot d'étranglement dans votre code Scala. Joindre des chaînes sur JVM est une opération assez coûteuse (voir par exemple: La concaténation de chaînes dans scala est-elle aussi coûteuse qu'en Java? ). Cela signifie que quelque chose comme celui-ci
_.reduceByKey((v1: String, v2: String) => v1 + ',' + v2)
qui est équivalent àinput4.reduceByKey(valsConcat)
dans votre code n'est pas une bonne idée.Si vous voulez éviter,
groupByKey
vous pouvez essayer d'utiliseraggregateByKey
avecStringBuilder
. Quelque chose de similaire devrait faire l'affaire:rdd.aggregateByKey(new StringBuilder)( (acc, e) => { if(!acc.isEmpty) acc.append(",").append(e) else acc.append(e) }, (acc1, acc2) => { if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2) else acc1.append(",").addString(acc2) } )
mais je doute que cela en vaille la peine.
En gardant à l'esprit ce qui précède, j'ai réécrit votre code comme suit:
Scala :
val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{ (idx, iter) => if (idx == 0) iter.drop(1) else iter } val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{ case ("true", i) => (i, "1") case ("false", i) => (i, "0") case p => p.swap }) val result = pairs.groupByKey.map{ case (k, vals) => { val valsString = vals.mkString(",") s"$k,$valsString" } } result.saveAsTextFile("scalaout")
Python :
def drop_first_line(index, itr): if index == 0: return iter(list(itr)[1:]) else: return itr def separate_cols(line): line = line.replace('true', '1').replace('false', '0') vals = line.split(',') for (i, x) in enumerate(vals): yield (i, x) input = (sc .textFile('train.csv', minPartitions=6) .mapPartitionsWithIndex(drop_first_line)) pairs = input.flatMap(separate_cols) result = (pairs .groupByKey() .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1])))) result.saveAsTextFile("pythonout")
Résultats
En
local[6]
mode (Intel (R) Xeon (R) CPU E3-1245 V2 @ 3,40 GHz) avec 4 Go de mémoire par exécuteur, il faut (n = 3):Je suis presque sûr que la plupart de ce temps est consacré à la lecture aléatoire, à la sérialisation, à la désérialisation et à d'autres tâches secondaires. Juste pour le plaisir, voici du code naïf à un seul thread en Python qui effectue la même tâche sur cette machine en moins d'une minute:
def go(): with open("train.csv") as fr: lines = [ line.replace('true', '1').replace('false', '0').split(",") for line in fr] return zip(*lines[1:])
la source
Extension aux réponses ci-dessus -
Scala s'avère plus rapide à bien des égards par rapport à python, mais il y a des raisons valables pour lesquelles python devient plus populaire que scala, voyons quelques-unes d'entre elles -
Python pour Apache Spark est assez facile à apprendre et à utiliser. Cependant, ce n'est pas la seule raison pour laquelle Pyspark est un meilleur choix que Scala. Il y a plus.
L'API Python pour Spark peut être plus lente sur le cluster, mais à la fin, les scientifiques des données peuvent en faire beaucoup plus par rapport à Scala. La complexité de Scala est absente. L'interface est simple et complète.
Parler de la lisibilité du code, de la maintenance et de la familiarité avec l'API Python pour Apache Spark est bien meilleur que Scala.
Python est livré avec plusieurs bibliothèques liées à l'apprentissage automatique et au traitement du langage naturel. Cela facilite l'analyse des données et a également des statistiques qui sont beaucoup plus matures et éprouvées par le temps. Par exemple, numpy, pandas, scikit-learn, seaborn et matplotlib.
Remarque: la plupart des data scientists utilisent une approche hybride dans laquelle ils utilisent le meilleur des deux API.
Enfin, la communauté Scala s'avère souvent beaucoup moins utile aux programmeurs. Cela fait de Python un apprentissage très précieux. Si vous avez suffisamment d'expérience avec un langage de programmation à typage statique comme Java, vous pouvez cesser de vous soucier de ne pas utiliser Scala complètement.
la source