J'explore le comportement de Spark en joignant une table à elle-même. J'utilise Databricks.
Mon scénario fictif est:
Lire une table externe en tant que trame de données A (les fichiers sous-jacents sont au format delta)
Définissez la trame de données B comme trame de données A avec seulement certaines colonnes sélectionnées
Joignez les cadres de données A et B sur la colonne1 et la colonne2
(Oui, cela n'a pas beaucoup de sens, j'expérimente simplement pour comprendre la mécanique sous-jacente de Spark)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
Ma première tentative a été d'exécuter le code tel quel (tentative 1). J'ai ensuite essayé de répartir et de mettre en cache (tentative 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
Enfin, j'ai repartitionné, trié et mis en cache
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
Les dags respectifs générés sont tels que joints.
Mes questions sont:
Pourquoi, dans la tentative 1, la table semble être mise en cache même si la mise en cache n'a pas été explicitement spécifiée.
Pourquoi InMemoreTableScan est toujours suivi d'un autre nœud de ce type.
Pourquoi, dans la tentative 3, la mise en cache semble avoir lieu en deux étapes?
Pourquoi dans la tentative 3 WholeStageCodegen suit un (et un seul) InMemoreTableScan.
Réponses:
Ce que vous observez dans ces 3 plans est un mélange de runtime DataBricks et de Spark.
Tout d'abord, lors de l'exécution de DataBricks runtime 3.3+, la mise en cache est automatiquement activée pour tous les fichiers parquet. Configuration correspondante pour cela:
spark.databricks.io.cache.enabled true
Pour votre deuxième requête, InMemoryTableScan se produit deux fois car dès que la jointure a été appelée, spark a essayé de calculer le Dataset A et le Dataset B en parallèle. En supposant que différents exécuteurs ont reçu les tâches ci-dessus, les deux devront analyser la table à partir du cache (DataBricks).
Pour le troisième, InMemoryTableScan ne fait pas référence à la mise en cache en soi. Cela signifie simplement que quel que soit le catalyseur de plan formé, il fallait analyser la table en cache plusieurs fois.
PS: je ne peux pas visualiser le point 4 :)
la source