Comment ajouter une nouvelle colonne à un Spark DataFrame (à l'aide de PySpark)?

129

J'ai un Spark DataFrame (utilisant PySpark 1.5.1) et j'aimerais ajouter une nouvelle colonne.

J'ai essayé ce qui suit sans succès:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])

J'ai également eu une erreur en utilisant ceci:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

Alors, comment ajouter une nouvelle colonne (basée sur un vecteur Python) à un DataFrame existant avec PySpark?

Boris
la source

Réponses:

208

Vous ne pouvez pas ajouter une colonne arbitraire à une DataFramedans Spark. Les nouvelles colonnes ne peuvent être créées qu'à l'aide de littéraux (d'autres types de littéraux sont décrits dans Comment ajouter une colonne constante dans un Spark DataFrame? )

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

transformation d'une colonne existante:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

inclus en utilisant join:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

ou généré avec la fonction / udf:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

Les fonctions intégrées ( pyspark.sql.functions), qui correspondent à l'expression Catalyst, sont généralement préférées aux fonctions définies par l'utilisateur Python.

Si vous souhaitez ajouter le contenu d'un RDD arbitraire en tant que colonne, vous pouvez

zéro323
la source
1
"De nouvelles colonnes ne peuvent être créées qu'en utilisant des littéraux" Que signifient exactement les littéraux dans ce contexte?
timbram
La documentation de Spark est excellente, voir df.withColumn spark.apache.org/docs/2.1.0/api/python/…
Steven Black
10
La documentation Spark n'est «géniale» que dans la mesure où elle laisse de larges pans d'utilisation à un exercice pour le lecteur avisé. Spark (et Pyspark) couvre un véritable zoo de structures de données, avec peu ou pas d'instructions sur la façon de les convertir. Exemple concret: prolifération de questions comme celle-ci.
shadowtalker
62

Pour ajouter une colonne à l'aide d'un UDF:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 'cat1'
   elif value == 2: return 'cat2'
   ...
   else: return 'n/a'

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()

## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+
Mark Rajcok
la source
30

Pour Spark 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen'))
Luke W
la source
1
Doit être df.select ('*', (df.age + 10) .alias ('agePlusTen'))
Frank B.
1
Merci, et si vous entrez, df = df.select('*', (df.age + 10).alias('agePlusTen'))vous ajoutez effectivement une colonne arbitraire car @ zero323 nous a avertis ci-dessus était impossible, à moins qu'il n'y ait quelque chose de mal à faire cela dans Spark, dans Pandas, c'est la méthode standard ..
cardamome
Existe-t-il une version de ceci pour pySpark?
Tagar
L'extrait de code @Tagar ci-dessus est python.
Luke W
1
@GeoffreyAnderson,df.select('*', df.age + 10, df.age + 20)
Mark Rajcok
2

Il existe plusieurs façons d'ajouter une nouvelle colonne dans pySpark.

Créons d'abord un DataFrame simple.

date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())

Essayons maintenant de doubler la valeur de la colonne et de la stocker dans une nouvelle colonne. PFB quelques approches différentes pour atteindre la même chose.

# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()

# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()

# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()

# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()

Pour plus d'exemples et d'explications sur les fonctions Spark DataFrame, vous pouvez visiter mon blog .

J'espère que ça aide.

neeraj bhadani
la source
0

Vous pouvez définir un nouveau udflors de l'ajout d'un column_name:

u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')
Allen211
la source
0
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
    lambda val: val, # do sth to val
    StringType()
)
df.withColumn('new_col', func_name(df.old_col))
DeFOX
la source
Vous devez appeler StringType().
gberger
0

Je voudrais offrir un exemple généralisé pour un cas d'utilisation très similaire:

Cas d'utilisation: j'ai un csv composé de:

First|Third|Fifth
data|data|data
data|data|data
...billion more lines

J'ai besoin d'effectuer quelques transformations et le csv final doit ressembler à

First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines

Je dois le faire car c'est le schéma défini par un modèle et j'ai besoin que mes données finales soient interopérables avec les insertions en masse SQL et autres.

alors:

1) J'ai lu le csv original en utilisant spark.read et je l'appelle "df".

2) Je fais quelque chose sur les données.

3) J'ajoute les colonnes nulles en utilisant ce script:

outcols = []
for column in MY_COLUMN_LIST:
    if column in df.columns:
        outcols.append(column)
    else:
        outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))

df = df.select(outcols)

De cette façon, vous pouvez structurer votre schéma après le chargement d'un csv (cela fonctionnerait également pour réorganiser les colonnes si vous devez le faire pour de nombreuses tables).

bloodrootfc
la source
0

La manière la plus simple d'ajouter une colonne est d'utiliser "withColumn". Étant donné que le dataframe est créé à l'aide de sqlContext, vous devez spécifier le schéma ou par défaut, il peut être disponible dans l'ensemble de données. Si le schéma est spécifié, la charge de travail devient fastidieuse lors de sa modification à chaque fois.

Voici un exemple que vous pouvez considérer:

from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default 

# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")

# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")

# Check the change 
Data.printSchema()
Swaminathan Meenakshisundaram
la source
0

Nous pouvons ajouter des colonnes supplémentaires à DataFrame directement avec les étapes ci-dessous:

from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()
yogesh
la source