Spark: Pourquoi Python surpasse-t-il significativement Scala dans mon cas d'utilisation?

16

Pour comparer les performances de Spark lors de l'utilisation de Python et Scala, j'ai créé le même travail dans les deux langues et comparé le runtime. Je m'attendais à ce que les deux travaux prennent à peu près la même quantité de temps, mais le travail Python a pris seulement 27min, tandis que le travail Scala a pris 37min(presque 40% de plus!). J'ai également implémenté le même travail en Java et cela a 37minuteségalement pris . Comment est-il possible que Python soit tellement plus rapide?

Exemple minimal vérifiable:

Travail Python:

# Configuration
conf = pyspark.SparkConf()
conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
conf.set("spark.executor.instances", "4")
conf.set("spark.executor.cores", "8")
sc = pyspark.SparkContext(conf=conf)

# 960 Files from a public dataset in 2 batches
input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"

# Count occurances of a certain string
logData = sc.textFile(input_files)
logData2 = sc.textFile(input_files2)
a = logData.filter(lambda value: value.startswith('WARC-Type: response')).count()
b = logData2.filter(lambda value: value.startswith('WARC-Type: response')).count()

print(a, b)

Emploi Scala:

// Configuration
config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

// 960 Files from a public dataset in 2 batches 
val input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
val input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"

// Count occurances of a certain string
val logData1 = sc.textFile(input_files)
val logData2 = sc.textFile(input_files2)
val num1 = logData1.filter(line => line.startsWith("WARC-Type: response")).count()
val num2 = logData2.filter(line => line.startsWith("WARC-Type: response")).count()

println(s"Lines with a: $num1, Lines with b: $num2")

Rien qu'en regardant le code, ils semblent identiques. J'ai regardé les DAG et ils n'ont fourni aucun aperçu (ou du moins je n'ai pas le savoir-faire pour trouver une explication basée sur eux).

J'apprécierais vraiment tous les conseils.

maestromusica
la source
Les commentaires ne sont pas pour une discussion approfondie; cette conversation a été déplacée vers le chat .
Samuel Liew
1
J'aurais commencé l'analyse, avant de demander quoi que ce soit, en chronométrant les blocs et les instructions correspondants pour voir s'il y avait un endroit particulier où la version python est plus rapide. Ensuite, vous avez peut-être pu affiner la question «pourquoi cette instruction python est-elle plus rapide».
Terry Jan Reedy

Réponses:

11

Votre hypothèse de base, que Scala ou Java devrait être plus rapide pour cette tâche spécifique, est tout simplement incorrecte. Vous pouvez facilement le vérifier avec un minimum d'applications locales. Scala one:

import scala.io.Source
import java.time.{Duration, Instant}

object App {
  def main(args: Array[String]) {
    val Array(filename, string) = args

    val start = Instant.now()

    Source
      .fromFile(filename)
      .getLines
      .filter(line => line.startsWith(string))
      .length

    val stop = Instant.now()
    val duration = Duration.between(start, stop).toMillis
    println(s"${start},${stop},${duration}")
  }
}

Python un

import datetime
import sys

if __name__ == "__main__":
    _, filename, string = sys.argv
    start = datetime.datetime.now()
    with open(filename) as fr:
        # Not idiomatic or the most efficient but that's what
        # PySpark will use
        sum(1 for _ in filter(lambda line: line.startswith(string), fr))

    end = datetime.datetime.now()
    duration = round((end - start).total_seconds() * 1000)
    print(f"{start},{end},{duration}")

Résultats (300 répétitions chacun, Python 3.7.6, Scala 2.11.12), à Posts.xmlpartir du vidage de données hermeneutics.stackexchange.com avec un mélange de modèles correspondants et non correspondants:

boîtes à moustaches de durartion en millis pour les programmes ci-dessus

  • Python 273,50 (258,84, 288,16)
  • Scala 634.13 (533.81, 734.45)

Comme vous le voyez, Python est non seulement systématiquement plus rapide, mais également plus cohérent (propagation plus faible).

Le message à retenir est - ne croyez pas FUD non corroboré - les langues peuvent être plus rapides ou plus lentes sur des tâches spécifiques ou avec des environnements spécifiques (par exemple ici Scala peut être touché par le démarrage de la JVM et / ou GC et / ou JIT), mais si vous le prétendez comme "XYZ est X4 plus rapide" ou "XYZ est lent par rapport à ZYX (..) Environ 10 fois plus lent", cela signifie généralement que quelqu'un a écrit un très mauvais code pour tester les choses.

Modifier :

Pour répondre à certaines préoccupations soulevées dans les commentaires:

  • Dans le code OP, les données sont transmises principalement dans une seule direction (JVM -> Python) et aucune sérialisation réelle n'est requise (ce chemin spécifique passe juste par bytestring tel quel et décode sur UTF-8 de l'autre côté). C'est aussi bon marché que quand il s'agit de «sérialisation».
  • Ce qui est renvoyé n'est qu'un seul entier par partition, donc dans ce sens, l'impact est négligeable.
  • La communication se fait sur des sockets locaux (toutes les communications sur le travailleur au-delà de la connexion initiale et de l'authentification sont effectuées à l'aide du descripteur de fichier renvoyé par local_connect_and_auth, et ce n'est rien d'autre que le fichier associé au socket ). Encore une fois, aussi bon marché que possible en ce qui concerne la communication entre les processus.
  • Compte tenu de la différence de performances brutes indiquée ci-dessus (beaucoup plus élevée que ce que vous voyez dans votre programme), il y a beaucoup de marge pour les frais généraux répertoriés ci-dessus.
  • Ce cas est complètement différent des cas où des objets simples ou complexes doivent être passés vers et depuis l'interpréteur Python sous une forme accessible aux deux parties en tant que vidages compatibles avec les cornichons (les exemples les plus notables incluent les UDF à l'ancienne, certaines parties de l'ancien -style MLLib).

Modifier 2 :

Puisque jasper-m était préoccupé par le coût de démarrage ici, on peut facilement prouver que Python a encore un avantage significatif sur Scala même si la taille d'entrée est considérablement augmentée.

Voici les résultats pour 2003360 lignes / 5.6G (la même entrée, juste dupliquée plusieurs fois, 30 répétitions), ce qui dépasse tout ce que vous pouvez attendre dans une seule tâche Spark.

entrez la description de l'image ici

  • Python 22809.57 (21466.26, 24152.87)
  • Scala 27315.28 (24367.24, 30263.31)

Veuillez noter les intervalles de confiance qui ne se chevauchent pas.

Modifier 3 :

Pour répondre à un autre commentaire de Jasper-M:

La majeure partie du traitement se déroule toujours à l'intérieur d'une machine virtuelle Java dans le boîtier Spark.

C'est tout simplement incorrect dans ce cas particulier:

  • Le travail en question est un travail de mappage avec une réduction globale unique à l'aide de RDD PySpark.
  • PySpark RDD (contrairement à, disons DataFrame) implémente nativement des fonctionnalités natives en Python, avec des exceptions d'entrée, de sortie et de communication entre les nœuds.
  • Étant donné qu'il s'agit d'un travail en une seule étape et que la sortie finale est suffisamment petite pour être ignorée, la principale responsabilité de la JVM (si l'on devait faire un nitpick, cela est implémenté principalement en Java et non Scala) est d'invoquer le format d'entrée Hadoop et de pousser les données via le socket fichier en Python.
  • La partie lue est identique pour la JVM et l'API Python, elle peut donc être considérée comme une surcharge constante. Il n'est pas non plus considéré comme la majeure partie du traitement , même pour un travail aussi simple que celui-ci.
user10938362
la source
3
excellente approche du problème. Merci de partager cela
Alexandros Biratsis
1
@egordoe Alexandros a dit "il n'y a pas d'UDF invoqué ici" pas que "Python n'est pas invoqué" - cela fait toute la différence. La surcharge de sérialisation est importante lorsque les données sont échangées entre les systèmes (c'est-à-dire lorsque vous souhaitez transmettre des données à un UDF et inversement).
user10938362
1
@egordoe Vous confondez clairement deux choses - les frais généraux de sérialisation, ce qui est un problème lorsque des objets non triviaux sont transmis d'avant en arrière. Et les frais généraux de communication. Il y a peu ou pas de surcharge de sérialisation ici, car vous passez et décodez simplement des bytestrings, et cela se produit principalement dans la direction, car en retour, vous obtenez un seul entier par partition. La communication est quelque peu préoccupante, mais le passage de données via des sockets locaux est efficace car il obtient vraiment en ce qui concerne la communication interprocessus. Si ce n'est pas clair, je vous recommande de lire la source - ce n'est pas difficile et sera instructif.
user10938362
1
De plus, les méthodes de sérialisation ne sont tout simplement pas égales. Comme le cas Spark montre que de bonnes méthodes de sérialisation peuvent réduire les coûts au niveau où cela n'est plus le cas (voir Pandas UDF avec Arrow) et quand cela se produit, d'autres facteurs peuvent dominer (voir par exemple les comparaisons de performances entre les fonctions de la fenêtre Scala et leurs équivalents avec Pandas UDF - Python y gagne par une marge beaucoup plus élevée que dans cette question).
user10938362
1
Et votre point est @ Jasper-M? Les tâches Spark individuelles sont généralement suffisamment petites pour avoir une charge de travail comparable à celle-ci. Ne me prenez pas dans le mauvais sens, mais si vous avez un contre-exemple réel qui invalide cette question ou la question entière, veuillez la poster. J'ai déjà noté que les actions secondaires contribuent dans une certaine mesure à cette valeur, mais elles ne dominent pas le coût. Nous sommes tous des ingénieurs (en quelque sorte) ici - parlons de chiffres et de code, pas de croyances, d'accord?
user10938362
4

Le travail Scala prend plus de temps car il a une mauvaise configuration et, par conséquent, les travaux Python et Scala ont été fournis avec des ressources inégales.

Il y a deux erreurs dans le code:

val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
sc.hadoopConfiguration.set("spark.executor.instances", "4") // LINE #4
sc.hadoopConfiguration.set("spark.executor.cores", "8") // LINE #5
  1. LIGNE 1. Une fois la ligne exécutée, la configuration des ressources du travail Spark est déjà établie et fixée. À partir de ce moment, aucun moyen de régler quoi que ce soit. Ni le nombre d'exécuteurs ni le nombre de cœurs par exécuteur.
  2. LIGNE 4-5. sc.hadoopConfigurationest un mauvais endroit pour définir une configuration Spark. Il doit être défini dans l' configinstance à laquelle vous passez new SparkContext(config).

[AJOUTÉ] Compte tenu de ce qui précède, je proposerais de changer le code du travail Scala en

config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

et le tester à nouveau. Je parie que la version Scala va être X fois plus rapide maintenant.

egordoe
la source
J'ai vérifié que les deux travaux exécutent 32 tâches en parallèle, donc je ne pense pas que ce soit le coupable?
maestromusica
merci pour la modification, je vais essayer de le tester dès maintenant
maestromusica
salut @ maestromusica, cela doit être quelque chose dans la configuration des ressources car, intrinsèquement, Python peut ne pas surpasser Scala dans ce cas d'utilisation particulier. Une autre raison peut être certains facteurs aléatoires non corrélés, à savoir la charge de la grappe au moment particulier et similaire. Btw, quel mode utilisez-vous? autonome, local, fil?
egordoe
Oui, j'ai vérifié que cette réponse est incorrecte. Le runtime est le même. J'ai également imprimé la configuration dans les deux cas et elle est identique.
maestromusica
1
Je pense que vous pourriez avoir raison. J'ai posé cette question pour enquêter sur toutes les autres possibilités telles qu'une erreur dans le code ou peut-être que j'ai mal compris quelque chose. Merci pour votre contribution.
maestromusica