J'ai commencé à utiliser Spark SQL et DataFrames dans Spark 1.4.0. Je souhaite définir un partitionneur personnalisé sur DataFrames, dans Scala, mais je ne vois pas comment faire cela.
L'une des tables de données avec lesquelles je travaille contient une liste de transactions, par compte, silimar à l'exemple suivant.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
Au moins au début, la plupart des calculs auront lieu entre les transactions au sein d'un compte. Je voudrais donc que les données soient partitionnées afin que toutes les transactions d'un compte soient dans la même partition Spark.
Mais je ne vois pas de moyen de définir cela. La classe DataFrame a une méthode appelée 'repartition (Int)', où vous pouvez spécifier le nombre de partitions à créer. Mais je ne vois aucune méthode disponible pour définir un partitionneur personnalisé pour un DataFrame, tel que celui pouvant être spécifié pour un RDD.
Les données source sont stockées dans Parquet. J'ai vu que lors de l'écriture d'un DataFrame sur Parquet, vous pouvez spécifier une colonne à partitionner, donc je pourrais probablement dire à Parquet de partitionner ses données par la colonne 'Compte'. Mais il pourrait y avoir des millions de comptes, et si je comprends bien Parquet, cela créerait un répertoire distinct pour chaque compte, donc cela ne semblait pas être une solution raisonnable.
Existe-t-il un moyen pour Spark de partitionner ce DataFrame afin que toutes les données d'un compte se trouvent dans la même partition?
int(account/someInteger)
et ainsi obtenir un nombre raisonnable de comptes par répertoire.partitionBy(Partitioner)
méthode, mais pour DataFrames au lieu de RDD. Je vois maintenant que cepartitionBy
n'est disponible que pour les RDD Pair, je ne sais pas pourquoi.Réponses:
Étincelle> = 2,3,0
SPARK-22614 expose le partitionnement de plage.
SPARK-22389 expose le partitionnement de format externe dans l' API de source de données v2 .
Étincelle> = 1,6,0
Dans Spark> = 1.6, il est possible d'utiliser le partitionnement par colonne pour les requêtes et la mise en cache. Voir: SPARK-11410 et SPARK-4849 en utilisant la
repartition
méthode:Contrairement à
RDDs
SparkDataset
(y comprisDataset[Row]
akaDataFrame
) ne peut pas utiliser de partitionneur personnalisé comme pour le moment. Vous pouvez généralement résoudre ce problème en créant une colonne de partitionnement artificielle, mais cela ne vous donnera pas la même flexibilité.Spark <1.6.0:
Une chose que vous pouvez faire est de pré-partitionner les données d'entrée avant de créer un
DataFrame
Étant donné que la
DataFrame
création à partir d'unRDD
ne nécessite qu'une simple phase de carte, la disposition des partitions existantes doit être préservée *:De la même manière que vous pouvez repartitionner existant
DataFrame
:Il semble donc que ce n'est pas impossible. La question demeure de savoir si cela a du sens. Je dirai que la plupart du temps, ce n'est pas le cas:
Le repartitionnement est un processus coûteux. Dans un scénario typique, la plupart des données doivent être sérialisées, mélangées et désérialisées. D'un autre côté, le nombre d'opérations qui peuvent bénéficier de données pré-partitionnées est relativement petit et est encore limité si l'API interne n'est pas conçue pour tirer parti de cette propriété.
GROUP BY
- il est possible de réduire l'empreinte mémoire des tampons temporaires **, mais le coût global est beaucoup plus élevé. Plus ou moins équivalent àgroupByKey.mapValues(_.reduce)
(comportement actuel) vsreduceByKey
(pré-partitionnement). Peu susceptible d'être utile dans la pratique.SqlContext.cacheTable
. Puisqu'il semble utiliser le codage de longueur d'exécution, l'applicationOrderedRDDFunctions.repartitionAndSortWithinPartitions
pourrait améliorer le taux de compression.Les performances dépendent fortement d'une distribution des clés. S'il est biaisé, il en résultera une utilisation sous-optimale des ressources. Dans le pire des cas, il sera impossible de terminer le travail du tout.
Concepts associés
Partitionnement avec des sources JDBC :
predicates
Argument de prise en charge des sources de données JDBC . Il peut être utilisé comme suit:Il crée une seule partition JDBC par prédicat. Gardez à l'esprit que si les ensembles créés à l'aide de prédicats individuels ne sont pas disjoints, vous verrez des doublons dans la table résultante.
partitionBy
méthode enDataFrameWriter
:Spark
DataFrameWriter
fournit unepartitionBy
méthode qui peut être utilisée pour «partitionner» les données lors de l'écriture. Il sépare les données lors de l'écriture à l'aide de l'ensemble de colonnes fourniCela permet de pousser le prédicat vers le bas lors de la lecture pour les requêtes basées sur la clé:
mais ce n'est pas équivalent à
DataFrame.repartition
. En particulier des agrégations comme:nécessitera toujours
TungstenExchange
:bucketBy
méthode dansDataFrameWriter
(Spark> = 2.0):bucketBy
a des applications similairespartitionBy
mais il n'est disponible que pour les tables (saveAsTable
). Les informations de regroupement peuvent être utilisées pour optimiser les jointures:* Par disposition de partition, je veux dire seulement une distribution de données.
partitioned
RDD n'a plus de partitionneur. ** En supposant aucune projection précoce. Si l'agrégation ne couvre qu'un petit sous-ensemble de colonnes, il n'y a probablement aucun gain.la source
DataFrameWriter.partitionBy
n'est logiquement pas la même chose queDataFrame.repartition
. Former on ne mélange pas, il sépare simplement la sortie. En ce qui concerne la première question: les données sont enregistrées par partition et il n'y a pas de mélange. Vous pouvez facilement vérifier cela en lisant des fichiers individuels. Mais Spark seul n'a aucun moyen de savoir si c'est ce que vous voulez vraiment.Dans Spark <1.6 Si vous créez un
HiveContext
, pas l'ancien,SqlContext
vous pouvez utiliser HiveQLDISTRIBUTE BY colX...
(garantit que chacun des N réducteurs obtient des plages de x non superposées) &CLUSTER BY colX...
(raccourci pour Distribuer par et Trier par) par exemple;Je ne sais pas comment cela s'intègre avec l'API Spark DF. Ces mots-clés ne sont pas pris en charge dans le SqlContext normal (notez que vous n'avez pas besoin d'un meta store de ruche pour utiliser le HiveContext)
EDIT: Spark 1.6+ l'a maintenant dans l'API native DataFrame
la source
Donc, pour commencer par une sorte de réponse:) - Vous ne pouvez pas
Je ne suis pas un expert, mais pour autant que je comprends les DataFrames, ils ne sont pas égaux à rdd et DataFrame n'a rien de tel que Partitioner.
Généralement, l'idée de DataFrame est de fournir un autre niveau d'abstraction qui gère lui-même ces problèmes. Les requêtes sur DataFrame sont traduites en plan logique qui est ensuite traduit en opérations sur les RDD. Le partitionnement que vous avez suggéré sera probablement appliqué automatiquement ou du moins devrait l'être.
Si vous ne croyez pas à SparkSQL qu'il fournira une sorte de travail optimal, vous pouvez toujours transformer DataFrame en RDD [Row] comme suggéré dans les commentaires.
la source
Utilisez le DataFrame retourné par:
Il n'y a pas de moyen explicite d'utiliser
partitionBy
sur un DataFrame, uniquement sur un PairRDD, mais lorsque vous triez un DataFrame, il l'utilisera dans son LogicalPlan et cela vous aidera lorsque vous aurez besoin de faire des calculs sur chaque compte.Je suis juste tombé sur le même problème exact, avec un dataframe que je souhaite partitionner par compte. Je suppose que lorsque vous dites "voulez que les données soient partitionnées de sorte que toutes les transactions d'un compte soient dans la même partition Spark", vous le voulez pour l'échelle et les performances, mais votre code n'en dépend pas (comme l'utilisation
mapPartitions()
etc), non?la source
J'ai pu le faire en utilisant RDD. Mais je ne sais pas si c'est une solution acceptable pour vous. Une fois que vous avez le DF disponible en tant que RDD, vous pouvez postuler
repartitionAndSortWithinPartitions
pour effectuer un repartitionnement personnalisé des données.Voici un exemple que j'ai utilisé:
la source