Renommer les noms de colonne d'un DataFrame dans Spark Scala

93

J'essaie de convertir tous les en-têtes / noms de colonnes d'un DataFramedans Spark-Scala. à partir de maintenant, je propose le code suivant qui ne remplace qu'un seul nom de colonne.

for( i <- 0 to origCols.length - 1) {
  df.withColumnRenamed(
    df.columns(i), 
    df.columns(i).toLowerCase
  );
}
Sam
la source

Réponses:

237

Si la structure est plate:

val df = Seq((1L, "a", "foo", 3.0)).toDF
df.printSchema
// root
//  |-- _1: long (nullable = false)
//  |-- _2: string (nullable = true)
//  |-- _3: string (nullable = true)
//  |-- _4: double (nullable = false)

la chose la plus simple que vous puissiez faire est d'utiliser la toDFméthode:

val newNames = Seq("id", "x1", "x2", "x3")
val dfRenamed = df.toDF(newNames: _*)

dfRenamed.printSchema
// root
// |-- id: long (nullable = false)
// |-- x1: string (nullable = true)
// |-- x2: string (nullable = true)
// |-- x3: double (nullable = false)

Si vous souhaitez renommer des colonnes individuelles, vous pouvez utiliser soit selectavec alias:

df.select($"_1".alias("x1"))

qui peut être facilement généralisée à plusieurs colonnes:

val lookup = Map("_1" -> "foo", "_3" -> "bar")

df.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)

ou withColumnRenamed:

df.withColumnRenamed("_1", "x1")

qui utilisent avec foldLeftpour renommer plusieurs colonnes:

lookup.foldLeft(df)((acc, ca) => acc.withColumnRenamed(ca._1, ca._2))

Avec les structures imbriquées ( structs), une option possible est de renommer en sélectionnant une structure entière:

val nested = spark.read.json(sc.parallelize(Seq(
    """{"foobar": {"foo": {"bar": {"first": 1.0, "second": 2.0}}}, "id": 1}"""
)))

nested.printSchema
// root
//  |-- foobar: struct (nullable = true)
//  |    |-- foo: struct (nullable = true)
//  |    |    |-- bar: struct (nullable = true)
//  |    |    |    |-- first: double (nullable = true)
//  |    |    |    |-- second: double (nullable = true)
//  |-- id: long (nullable = true)

@transient val foobarRenamed = struct(
  struct(
    struct(
      $"foobar.foo.bar.first".as("x"), $"foobar.foo.bar.first".as("y")
    ).alias("point")
  ).alias("location")
).alias("record")

nested.select(foobarRenamed, $"id").printSchema
// root
//  |-- record: struct (nullable = false)
//  |    |-- location: struct (nullable = false)
//  |    |    |-- point: struct (nullable = false)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)
//  |-- id: long (nullable = true)

Notez que cela peut affecter les nullabilitymétadonnées. Une autre possibilité est de renommer en castant:

nested.select($"foobar".cast(
  "struct<location:struct<point:struct<x:double,y:double>>>"
).alias("record")).printSchema

// root
//  |-- record: struct (nullable = true)
//  |    |-- location: struct (nullable = true)
//  |    |    |-- point: struct (nullable = true)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)

ou:

import org.apache.spark.sql.types._

nested.select($"foobar".cast(
  StructType(Seq(
    StructField("location", StructType(Seq(
      StructField("point", StructType(Seq(
        StructField("x", DoubleType), StructField("y", DoubleType)))))))))
).alias("record")).printSchema

// root
//  |-- record: struct (nullable = true)
//  |    |-- location: struct (nullable = true)
//  |    |    |-- point: struct (nullable = true)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)
zéro323
la source
Salut @ zero323 Lorsque j'utilise withColumnRenamed, je reçois AnalysisException ne peut pas résoudre 'CC8. 1 'colonnes d'entrée données ... Il échoue même si CC8.1 est disponible dans DataFrame, veuillez vous guider.
unk1102
@ u449355 Il n'est pas clair pour moi s'il s'agit d'une colonne imbriquée ou d'une colonne contenant des points. Dans ce dernier cas, les backticks devraient fonctionner (au moins dans certains cas de base).
zero323
1
qu'est-ce que cela : _*)signifie dansdf.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)
Anton Kim
1
Pour répondre à la question d'Anton Kim: le : _*est l'opérateur scala dit "splat". Il explose fondamentalement un élément de type tableau en une liste non contenue, ce qui est utile lorsque vous voulez passer le tableau à une fonction qui prend un nombre arbitraire d'arguments, mais n'a pas de version qui prend un List[]. Si vous êtes familier avec Perl, c'est la différence entre some_function(@my_array) # "splatted"et some_function(\@my_array) # not splatted ... in perl the backslash "\" operator returns a reference to a thing.
Mylo Stone
1
Cette affirmation m'est vraiment obscure df.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*). Pourriez-vous la décomposer s'il vous plaît? surtout la lookup.getOrElse(c,c)partie.
Aetos
18

Pour ceux d'entre vous intéressés par la version PySpark (en fait c'est la même chose dans Scala - voir le commentaire ci-dessous):

    merchants_df_renamed = merchants_df.toDF(
        'merchant_id', 'category', 'subcategory', 'merchant')

    merchants_df_renamed.printSchema()

Résultat:

root
| - marchand_id: entier (nullable = true)
| - catégorie: string (nullable = true)
| - sous-catégorie: string (nullable = true)
| - marchand: string (nullable = true)

Tagar
la source
1
Avec l'utilisation toDF()pour renommer les colonnes dans DataFrame doit être prudent. Cette méthode fonctionne beaucoup plus lentement que d'autres. J'ai DataFrame contient 100 millions d'enregistrements et une requête de comptage simple prend ~ 3s, alors que la même requête avec la toDF()méthode prend ~ 16s. Mais lorsque j'utilise une select col AS col_newméthode pour renommer, j'obtiens à nouveau ~ 3s. Plus de 5 fois plus vite! Spark 2.3.2.3
Ihor Konovalenko
6
def aliasAllColumns(t: DataFrame, p: String = "", s: String = ""): DataFrame =
{
  t.select( t.columns.map { c => t.col(c).as( p + c + s) } : _* )
}

Si ce n'est pas évident, cela ajoute un préfixe et un suffixe à chacun des noms de colonne actuels. Cela peut être utile lorsque vous avez deux tables avec une ou plusieurs colonnes ayant le même nom et que vous souhaitez les joindre tout en étant capable de lever l'ambiguïté des colonnes dans la table résultante. Ce serait bien s'il y avait une manière similaire de faire cela en SQL "normal".

Pierre de Mylo
la source
aime bien sûr, gentil et élégant
thebluephantom
1

Supposons que le dataframe df ait 3 colonnes id1, name1, price1 et que vous souhaitiez les renommer en id2, name2, price2

val list = List("id2", "name2", "price2")
import spark.implicits._
val df2 = df.toDF(list:_*)
df2.columns.foreach(println)

J'ai trouvé cette approche utile dans de nombreux cas.

Jagadeesh Verri
la source
0

la jointure de table de remorquage ne renomme pas la clé jointe

// method 1: create a new DF
day1 = day1.toDF(day1.columns.map(x => if (x.equals(key)) x else s"${x}_d1"): _*)

// method 2: use withColumnRenamed
for ((x, y) <- day1.columns.filter(!_.equals(key)).map(x => (x, s"${x}_d1"))) {
    day1 = day1.withColumnRenamed(x, y)
}

travaux!

Colin Wang
la source