Vous ne pouvez pas ajouter une colonne arbitraire à une DataFrame
dans Spark. Les nouvelles colonnes ne peuvent être créées qu'à l'aide de littéraux (d'autres types de littéraux sont décrits dans Comment ajouter une colonne constante dans un Spark DataFrame? )
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
transformation d'une colonne existante:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
inclus en utilisant join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
ou généré avec la fonction / udf:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
Les fonctions intégrées ( pyspark.sql.functions
), qui correspondent à l'expression Catalyst, sont généralement préférées aux fonctions définies par l'utilisateur Python.
Si vous souhaitez ajouter le contenu d'un RDD arbitraire en tant que colonne, vous pouvez
Pour ajouter une colonne à l'aide d'un UDF:
la source
Pour Spark 2.0
la source
df = df.select('*', (df.age + 10).alias('agePlusTen'))
vous ajoutez effectivement une colonne arbitraire car @ zero323 nous a avertis ci-dessus était impossible, à moins qu'il n'y ait quelque chose de mal à faire cela dans Spark, dans Pandas, c'est la méthode standard ..df.select('*', df.age + 10, df.age + 20)
Il existe plusieurs façons d'ajouter une nouvelle colonne dans pySpark.
Créons d'abord un DataFrame simple.
Essayons maintenant de doubler la valeur de la colonne et de la stocker dans une nouvelle colonne. PFB quelques approches différentes pour atteindre la même chose.
Pour plus d'exemples et d'explications sur les fonctions Spark DataFrame, vous pouvez visiter mon blog .
J'espère que ça aide.
la source
Vous pouvez définir un nouveau
udf
lors de l'ajout d'uncolumn_name
:la source
la source
StringType()
.Je voudrais offrir un exemple généralisé pour un cas d'utilisation très similaire:
Cas d'utilisation: j'ai un csv composé de:
J'ai besoin d'effectuer quelques transformations et le csv final doit ressembler à
Je dois le faire car c'est le schéma défini par un modèle et j'ai besoin que mes données finales soient interopérables avec les insertions en masse SQL et autres.
alors:
1) J'ai lu le csv original en utilisant spark.read et je l'appelle "df".
2) Je fais quelque chose sur les données.
3) J'ajoute les colonnes nulles en utilisant ce script:
De cette façon, vous pouvez structurer votre schéma après le chargement d'un csv (cela fonctionnerait également pour réorganiser les colonnes si vous devez le faire pour de nombreuses tables).
la source
La manière la plus simple d'ajouter une colonne est d'utiliser "withColumn". Étant donné que le dataframe est créé à l'aide de sqlContext, vous devez spécifier le schéma ou par défaut, il peut être disponible dans l'ensemble de données. Si le schéma est spécifié, la charge de travail devient fastidieuse lors de sa modification à chaque fois.
Voici un exemple que vous pouvez considérer:
la source
Nous pouvons ajouter des colonnes supplémentaires à DataFrame directement avec les étapes ci-dessous:
la source