Comment ajouter une colonne constante dans un Spark DataFrame?

137

Je veux ajouter une colonne dans un DataFrameavec une valeur arbitraire (c'est la même chose pour chaque ligne). J'obtiens une erreur lorsque j'utilise withColumncomme suit:

dt.withColumn('new_column', 10).head(5)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
      1 dt = (messages
      2     .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)

/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1166         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
   1167         """
-> 1168         return self.select('*', col.alias(colName))
   1169 
   1170     @ignore_unicode_prefix

AttributeError: 'int' object has no attribute 'alias'

Il semble que je puisse tromper la fonction pour qu'elle fonctionne comme je le souhaite en ajoutant et en soustrayant l'une des autres colonnes (afin qu'elles s'ajoutent à zéro), puis en ajoutant le nombre que je veux (10 dans ce cas):

dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=93506471, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=80488242, messagetype=1, dt=4809600.0, new_column=10)]

C'est extrêmement piraté, non? Je suppose qu'il existe un moyen plus légitime de le faire?

Evan Zamir
la source

Réponses:

221

Spark 2.2+

Spark 2.2 introduit la prise typedLiten charge Seq, Mapet Tuples( SPARK-19254 ) et les appels suivants doivent être pris en charge (Scala):

import org.apache.spark.sql.functions.typedLit

df.withColumn("some_array", typedLit(Seq(1, 2, 3)))
df.withColumn("some_struct", typedLit(("foo", 1, 0.3)))
df.withColumn("some_map", typedLit(Map("key1" -> 1, "key2" -> 2)))

Spark 1.3+ ( lit), 1.4+ ( array, struct), 2.0+ ( map):

Le deuxième argument pour DataFrame.withColumndevrait être un Columndonc vous devez utiliser un littéral:

from pyspark.sql.functions import lit

df.withColumn('new_column', lit(10))

Si vous avez besoin de colonnes complexes, vous pouvez les construire en utilisant des blocs comme array:

from pyspark.sql.functions import array, create_map, struct

df.withColumn("some_array", array(lit(1), lit(2), lit(3)))
df.withColumn("some_struct", struct(lit("foo"), lit(1), lit(.3)))
df.withColumn("some_map", create_map(lit("key1"), lit(1), lit("key2"), lit(2)))

Exactement les mêmes méthodes peuvent être utilisées dans Scala.

import org.apache.spark.sql.functions.{array, lit, map, struct}

df.withColumn("new_column", lit(10))
df.withColumn("map", map(lit("key1"), lit(1), lit("key2"), lit(2)))

Pour fournir des noms à structsutiliser aliasdans chaque champ:

df.withColumn(
    "some_struct",
    struct(lit("foo").alias("x"), lit(1).alias("y"), lit(0.3).alias("z"))
 )

ou castsur l'objet entier

df.withColumn(
    "some_struct", 
    struct(lit("foo"), lit(1), lit(0.3)).cast("struct<x: string, y: integer, z: double>")
 )

Il est également possible, bien que plus lent, d'utiliser un UDF.

Remarque :

Les mêmes constructions peuvent être utilisées pour passer des arguments constants aux UDF ou aux fonctions SQL.

zéro323
la source
1
Pour les autres qui l'utilisent pour implémenter ... la méthode withColumn renvoie un nouveau DataFrame en ajoutant une colonne ou en remplaçant la colonne existante qui porte le même nom, vous devrez donc réaffecter les résultats à df ou attribuer une nouvelle variable. Par exemple, `df = df.withColumn ('new_column', lit (10)) '
Even Mien
à chaque itération, pouvons-nous changer les valeurs à l'intérieur de la colonne? J'ai déjà essayé for i in range(len(item)) : df.withColumn('new_column', lit({}).format(i)) mais cela ne fonctionne pas
Tracy
30

Dans Spark 2.2, il existe deux façons d'ajouter une valeur constante dans une colonne de DataFrame:

1) Utilisation lit

2) Utilisation typedLit.

La différence entre les deux est qu'il typedLitpeut également gérer les types de scala paramétrés, par exemple List, Seq et Map

Exemple de DataFrame:

val df = spark.createDataFrame(Seq((0,"a"),(1,"b"),(2,"c"))).toDF("id", "col1")

+---+----+
| id|col1|
+---+----+
|  0|   a|
|  1|   b|
+---+----+

1) Utilisation lit: Ajout d'une valeur de chaîne constante dans une nouvelle colonne nommée newcol:

import org.apache.spark.sql.functions.lit
val newdf = df.withColumn("newcol",lit("myval"))

Résultat:

+---+----+------+
| id|col1|newcol|
+---+----+------+
|  0|   a| myval|
|  1|   b| myval|
+---+----+------+

2) Utilisation typedLit:

import org.apache.spark.sql.functions.typedLit
df.withColumn("newcol", typedLit(("sample", 10, .044)))

Résultat:

+---+----+-----------------+
| id|col1|           newcol|
+---+----+-----------------+
|  0|   a|[sample,10,0.044]|
|  1|   b|[sample,10,0.044]|
|  2|   c|[sample,10,0.044]|
+---+----+-----------------+
Ayush Vatsyayan
la source
Pourriez-vous partager la version complète avec la déclaration d'importation
Ayush Vatsyayan
Spark version 2.2.1. L'instruction import provient de pyspark.sql.functions import typedLit. J'ai également essayé celui que vous avez partagé ci-dessus.
braj