Comment définir le partitionnement de DataFrame?

129

J'ai commencé à utiliser Spark SQL et DataFrames dans Spark 1.4.0. Je souhaite définir un partitionneur personnalisé sur DataFrames, dans Scala, mais je ne vois pas comment faire cela.

L'une des tables de données avec lesquelles je travaille contient une liste de transactions, par compte, silimar à l'exemple suivant.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

Au moins au début, la plupart des calculs auront lieu entre les transactions au sein d'un compte. Je voudrais donc que les données soient partitionnées afin que toutes les transactions d'un compte soient dans la même partition Spark.

Mais je ne vois pas de moyen de définir cela. La classe DataFrame a une méthode appelée 'repartition (Int)', où vous pouvez spécifier le nombre de partitions à créer. Mais je ne vois aucune méthode disponible pour définir un partitionneur personnalisé pour un DataFrame, tel que celui pouvant être spécifié pour un RDD.

Les données source sont stockées dans Parquet. J'ai vu que lors de l'écriture d'un DataFrame sur Parquet, vous pouvez spécifier une colonne à partitionner, donc je pourrais probablement dire à Parquet de partitionner ses données par la colonne 'Compte'. Mais il pourrait y avoir des millions de comptes, et si je comprends bien Parquet, cela créerait un répertoire distinct pour chaque compte, donc cela ne semblait pas être une solution raisonnable.

Existe-t-il un moyen pour Spark de partitionner ce DataFrame afin que toutes les données d'un compte se trouvent dans la même partition?

râteau
la source
consultez ce lien stackoverflow.com/questions/23127329/…
Abhishek Choudhary
Si vous pouvez dire à Parquet de partitionner par compte, vous pouvez probablement partitionner par int(account/someInteger)et ainsi obtenir un nombre raisonnable de comptes par répertoire.
Paul
1
@ABC: J'ai vu ce lien. Je cherchais l'équivalent de cette partitionBy(Partitioner)méthode, mais pour DataFrames au lieu de RDD. Je vois maintenant que ce partitionByn'est disponible que pour les RDD Pair, je ne sais pas pourquoi.
rake
@Paul: J'ai envisagé de faire ce que vous décrivez. Quelques choses m'ont retenu:
rake
continue .... (1) C'est pour "Parquet-partitioning". Je n'ai pas pu trouver de documents indiquant que le partitionnement Spark utilisera réellement le partitionnement Parquet. (2) Si je comprends la documentation Parquet, je dois définir un nouveau champ "foo", alors chaque répertoire Parquet aura un nom comme "foo = 123". Mais si je construis une requête impliquant AccountID , comment Spark / hive / parquet saurait-il qu'il existe un lien entre foo et AccountID ?
rake

Réponses:

177

Étincelle> = 2,3,0

SPARK-22614 expose le partitionnement de plage.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 expose le partitionnement de format externe dans l' API de source de données v2 .

Étincelle> = 1,6,0

Dans Spark> = 1.6, il est possible d'utiliser le partitionnement par colonne pour les requêtes et la mise en cache. Voir: SPARK-11410 et SPARK-4849 en utilisant la repartitionméthode:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

Contrairement à RDDsSpark Dataset(y compris Dataset[Row]aka DataFrame) ne peut pas utiliser de partitionneur personnalisé comme pour le moment. Vous pouvez généralement résoudre ce problème en créant une colonne de partitionnement artificielle, mais cela ne vous donnera pas la même flexibilité.

Spark <1.6.0:

Une chose que vous pouvez faire est de pré-partitionner les données d'entrée avant de créer un DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

Étant donné que la DataFramecréation à partir d'un RDDne nécessite qu'une simple phase de carte, la disposition des partitions existantes doit être préservée *:

assert(df.rdd.partitions == partitioned.partitions)

De la même manière que vous pouvez repartitionner existant DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

Il semble donc que ce n'est pas impossible. La question demeure de savoir si cela a du sens. Je dirai que la plupart du temps, ce n'est pas le cas:

  1. Le repartitionnement est un processus coûteux. Dans un scénario typique, la plupart des données doivent être sérialisées, mélangées et désérialisées. D'un autre côté, le nombre d'opérations qui peuvent bénéficier de données pré-partitionnées est relativement petit et est encore limité si l'API interne n'est pas conçue pour tirer parti de cette propriété.

    • rejoint certains scénarios, mais cela nécessiterait un support interne,
    • les appels de fonctions de fenêtre avec le partitionneur correspondant. Idem que ci-dessus, limité à une seule définition de fenêtre. Il est déjà partitionné en interne, donc le pré-partitionnement peut être redondant,
    • agrégations simples avec GROUP BY- il est possible de réduire l'empreinte mémoire des tampons temporaires **, mais le coût global est beaucoup plus élevé. Plus ou moins équivalent à groupByKey.mapValues(_.reduce)(comportement actuel) vs reduceByKey(pré-partitionnement). Peu susceptible d'être utile dans la pratique.
    • compression de données avec SqlContext.cacheTable. Puisqu'il semble utiliser le codage de longueur d'exécution, l'application OrderedRDDFunctions.repartitionAndSortWithinPartitionspourrait améliorer le taux de compression.
  2. Les performances dépendent fortement d'une distribution des clés. S'il est biaisé, il en résultera une utilisation sous-optimale des ressources. Dans le pire des cas, il sera impossible de terminer le travail du tout.

  3. Tout l'intérêt d'utiliser une API déclarative de haut niveau est de vous isoler des détails d'implémentation de bas niveau. Comme déjà mentionné par @dwysakowicz et @RomiKuntsman, l'optimisation est un travail de Catalyst Optimizer . C'est une bête assez sophistiquée et je doute vraiment que vous puissiez facilement améliorer cela sans plonger beaucoup plus profondément dans ses composants internes.

Concepts associés

Partitionnement avec des sources JDBC :

predicatesArgument de prise en charge des sources de données JDBC . Il peut être utilisé comme suit:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

Il crée une seule partition JDBC par prédicat. Gardez à l'esprit que si les ensembles créés à l'aide de prédicats individuels ne sont pas disjoints, vous verrez des doublons dans la table résultante.

partitionByméthode enDataFrameWriter :

Spark DataFrameWriterfournit une partitionByméthode qui peut être utilisée pour «partitionner» les données lors de l'écriture. Il sépare les données lors de l'écriture à l'aide de l'ensemble de colonnes fourni

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

Cela permet de pousser le prédicat vers le bas lors de la lecture pour les requêtes basées sur la clé:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

mais ce n'est pas équivalent à DataFrame.repartition. En particulier des agrégations comme:

val cnts = df1.groupBy($"k").sum()

nécessitera toujours TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketByméthode dansDataFrameWriter (Spark> = 2.0):

bucketBya des applications similaires partitionBymais il n'est disponible que pour les tables ( saveAsTable). Les informations de regroupement peuvent être utilisées pour optimiser les jointures:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* Par disposition de partition, je veux dire seulement une distribution de données. partitionedRDD n'a plus de partitionneur. ** En supposant aucune projection précoce. Si l'agrégation ne couvre qu'un petit sous-ensemble de colonnes, il n'y a probablement aucun gain.

zéro323
la source
@bychance Oui et non. La disposition des données sera préservée, mais AFAIK ne vous donnera pas d'avantages tels que l'élagage des partitions.
zero323
@ zero323 Merci, y a-t-il un moyen de vérifier l'allocation de partition du fichier parquet pour valider df.save.write effectivement enregistrer la mise en page? Et si je fais df.repartition ("A"), puis df.write.repartitionBy ("B"), la structure du dossier physique sera partitionnée par B, et dans chaque dossier de valeur B, gardera-t-elle toujours la partition par UNE?
hasard
2
@bychance DataFrameWriter.partitionByn'est logiquement pas la même chose que DataFrame.repartition. Former on ne mélange pas, il sépare simplement la sortie. En ce qui concerne la première question: les données sont enregistrées par partition et il n'y a pas de mélange. Vous pouvez facilement vérifier cela en lisant des fichiers individuels. Mais Spark seul n'a aucun moyen de savoir si c'est ce que vous voulez vraiment.
zero323
11

Dans Spark <1.6 Si vous créez un HiveContext, pas l'ancien, SqlContextvous pouvez utiliser HiveQL DISTRIBUTE BY colX... (garantit que chacun des N réducteurs obtient des plages de x non superposées) & CLUSTER BY colX...(raccourci pour Distribuer par et Trier par) par exemple;

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

Je ne sais pas comment cela s'intègre avec l'API Spark DF. Ces mots-clés ne sont pas pris en charge dans le SqlContext normal (notez que vous n'avez pas besoin d'un meta store de ruche pour utiliser le HiveContext)

EDIT: Spark 1.6+ l'a maintenant dans l'API native DataFrame

Loup de la nuit
la source
1
Les partitions sont-elles conservées lorsque la trame de données est enregistrée?
Sim
comment contrôler le nombre de partitions que vous pouvez avoir dans l'exemple hive ql? Par exemple, dans l'approche RDD par paire, vous pouvez le faire pour créer 5 partitions: val partitioner = new HashPartitioner (5)
Minnie
ok, j'ai trouvé la réponse, cela peut être fait comme ceci: sqlContext.setConf ("spark.sql.shuffle.partitions", "5") Je n'ai pas pu modifier le commentaire précédent car j'ai manqué la limite de 5 minutes
Minnie
7

Donc, pour commencer par une sorte de réponse:) - Vous ne pouvez pas

Je ne suis pas un expert, mais pour autant que je comprends les DataFrames, ils ne sont pas égaux à rdd et DataFrame n'a rien de tel que Partitioner.

Généralement, l'idée de DataFrame est de fournir un autre niveau d'abstraction qui gère lui-même ces problèmes. Les requêtes sur DataFrame sont traduites en plan logique qui est ensuite traduit en opérations sur les RDD. Le partitionnement que vous avez suggéré sera probablement appliqué automatiquement ou du moins devrait l'être.

Si vous ne croyez pas à SparkSQL qu'il fournira une sorte de travail optimal, vous pouvez toujours transformer DataFrame en RDD [Row] comme suggéré dans les commentaires.

Dawid Wysakowicz
la source
7

Utilisez le DataFrame retourné par:

yourDF.orderBy(account)

Il n'y a pas de moyen explicite d'utiliser partitionBysur un DataFrame, uniquement sur un PairRDD, mais lorsque vous triez un DataFrame, il l'utilisera dans son LogicalPlan et cela vous aidera lorsque vous aurez besoin de faire des calculs sur chaque compte.

Je suis juste tombé sur le même problème exact, avec un dataframe que je souhaite partitionner par compte. Je suppose que lorsque vous dites "voulez que les données soient partitionnées de sorte que toutes les transactions d'un compte soient dans la même partition Spark", vous le voulez pour l'échelle et les performances, mais votre code n'en dépend pas (comme l'utilisation mapPartitions()etc), non?

Romi Kuntsman
la source
3
Et si votre code en dépend parce que vous utilisez mapPartitions?
NightWolf
2
Vous pouvez convertir le DataFrame en RDD, puis le partitionner (par exemple en utilisant aggregatByKey () et passer un partitionneur personnalisé)
Romi Kuntsman
5

J'ai pu le faire en utilisant RDD. Mais je ne sais pas si c'est une solution acceptable pour vous. Une fois que vous avez le DF disponible en tant que RDD, vous pouvez postuler repartitionAndSortWithinPartitionspour effectuer un repartitionnement personnalisé des données.

Voici un exemple que j'ai utilisé:

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)
Développeur
la source