Comment puis-je convertir un RDD ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
) en Dataframe org.apache.spark.sql.DataFrame
. J'ai converti un dataframe en rdd en utilisant .rdd
. Après l'avoir traité, je veux le remettre dans le dataframe. Comment puis-je faire ceci ?
scala
apache-spark
apache-spark-sql
rdd
user568109
la source
la source
Réponses:
SqlContext
a un certain nombre decreateDataFrame
méthodes qui créent unDataFrame
fichierRDD
. J'imagine que l'un d'entre eux fonctionnera pour votre contexte.Par exemple:
la source
Ce code fonctionne parfaitement à partir de Spark 2.x avec Scala 2.11
Importer les classes nécessaires
Créer un
SparkSession
objet, et c'est icispark
Faisons un
RDD
pour le faireDataFrame
Méthode 1
Utilisation
SparkSession.createDataFrame(RDD obj)
.Méthode 2
Utilisation
SparkSession.createDataFrame(RDD obj)
et spécification des noms de colonne.Méthode 3 (réponse réelle à la question)
De cette façon, l'entrée
rdd
doit être de typeRDD[Row]
.créer le schéma
Maintenant, appliquez à la fois
rowsRdd
etschema
àcreateDataFrame()
la source
En supposant que votre RDD [ligne] s'appelle rdd, vous pouvez utiliser:
la source
Remarque: cette réponse a été initialement publiée ici
Je publie cette réponse car je voudrais partager des détails supplémentaires sur les options disponibles que je n'ai pas trouvées dans les autres réponses
Pour créer un DataFrame à partir d'un RDD de lignes, il existe deux options principales:
1) Comme déjà indiqué, vous pouvez utiliser
toDF()
ce qui peut être importé parimport sqlContext.implicits._
. Cependant, cette approche ne fonctionne que pour les types de RDD suivants:RDD[Int]
RDD[Long]
RDD[String]
RDD[T <: scala.Product]
(source: Scaladoc de l'
SQLContext.implicits
objet)La dernière signature signifie en fait qu'elle peut fonctionner pour un RDD de tuples ou un RDD de classes de cas (car les tuples et les classes de cas sont des sous-classes de
scala.Product
).Donc, pour utiliser cette approche pour un
RDD[Row]
, vous devez le mapper à unRDD[T <: scala.Product]
. Cela peut être fait en mappant chaque ligne à une classe de cas personnalisée ou à un tuple, comme dans les extraits de code suivants:ou
Le principal inconvénient de cette approche (à mon avis) est que vous devez définir explicitement le schéma du DataFrame résultant dans la fonction de carte, colonne par colonne. Peut-être que cela peut être fait par programme si vous ne connaissez pas le schéma à l'avance, mais les choses peuvent devenir un peu compliquées là-bas. Donc, alternativement, il existe une autre option:
2) Vous pouvez utiliser
createDataFrame(rowRDD: RDD[Row], schema: StructType)
comme dans la réponse acceptée, qui est disponible dans l' objet SQLContext . Exemple de conversion d'un RDD d'un ancien DataFrame:Notez qu'il n'est pas nécessaire de définir explicitement une colonne de schéma. Nous réutilisons l'ancien schéma de DF, qui est de
StructType
classe et peut être facilement étendu. Cependant, cette approche n'est parfois pas possible, et dans certains cas peut être moins efficace que la première.la source
import sqlContext.implicits.
Supposons que vous ayez un
DataFrame
et que vous souhaitiez modifier les données des champs en le convertissant enRDD[Row]
.Pour reconvertir à
DataFrame
partir de,RDD
nous devons définir le type de structure duRDD
.Si le type de données était
Long
alors il deviendra commeLongType
dans la structure.Si
String
alorsStringType
dans la structure.Vous pouvez maintenant convertir le RDD en DataFrame à l'aide de la méthode createDataFrame .
la source
Voici un exemple simple de conversion de votre liste en Spark RDD, puis de conversion de ce Spark RDD en Dataframe.
Veuillez noter que j'ai utilisé le scala REPL de Spark-shell pour exécuter le code suivant, ici sc est une instance de SparkContext qui est implicitement disponible dans Spark-shell. J'espère qu'il répondra à votre question.
la source
Méthode 1: (Scala)
Méthode 2: (Scala)
Méthode 1: (Python)
Méthode 2: (Python)
Extrait la valeur de l'objet de ligne, puis applique la classe de cas pour convertir rdd en DF
la source
Sur les nouvelles versions de Spark (2.0+)
la source
En supposant que val spark est un produit d'un SparkSession.builder ...
Mêmes étapes, mais avec moins de déclarations val:
la source
J'ai essayé d'expliquer la solution en utilisant le problème du nombre de mots . 1. Lisez le fichier à l'aide de sc
Méthodes pour créer DF
Lire le fichier à l'aide de Spark
Rdd à Dataframe
val df = sc.textFile ("D: // cca175 / data /") .toDF ("t1") df.show
Méthode 1
Créer un compte de mots RDD vers Dataframe
Méthode2
Créer Dataframe à partir de Rdd
Méthode3
Définir le schéma
import org.apache.spark.sql.types._
schéma val = nouveau StructType (). add (StructField ("mot", StringType, true)). add (StructField ("count", StringType, true))
Créer RowRDD
Créer DataFrame à partir de RDD avec un schéma
val df = spark.createDataFrame (rowRdd, schéma)
df.show
la source
Pour convertir un tableau [Row] en DataFrame ou Dataset, ce qui suit fonctionne avec élégance:
Dites, schema est le StructType de la ligne, puis
la source