Fonctions de la fenêtre :
Quelque chose comme ça devrait faire l'affaire:
import org.apache.spark.sql.functions.{row_number, max, broadcast}
import org.apache.spark.sql.expressions.Window
val df = sc.parallelize(Seq(
(0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
(1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
(2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
(3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")
val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc)
val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
Cette méthode sera inefficace en cas de biais significatif des données.
Agrégation SQL simple suivie dejoin
:
Vous pouvez également vous joindre avec un bloc de données agrégé:
val dfMax = df.groupBy($"hour".as("max_hour")).agg(max($"TotalValue").as("max_value"))
val dfTopByJoin = df.join(broadcast(dfMax),
($"hour" === $"max_hour") && ($"TotalValue" === $"max_value"))
.drop("max_hour")
.drop("max_value")
dfTopByJoin.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
Il conservera les valeurs en double (s'il y a plus d'une catégorie par heure avec la même valeur totale). Vous pouvez les supprimer comme suit:
dfTopByJoin
.groupBy($"hour")
.agg(
first("category").alias("category"),
first("TotalValue").alias("TotalValue"))
Utilisation de la commande surstructs
:
Astuce, bien que pas très bien testée, qui ne nécessite ni jointures ni fonctions de fenêtre:
val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
.groupBy($"hour")
.agg(max("vs").alias("vs"))
.select($"Hour", $"vs.Category", $"vs.TotalValue")
dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
Avec l'API DataSet (Spark 1.6+, 2.0+):
Spark 1.6 :
case class Record(Hour: Integer, Category: String, TotalValue: Double)
df.as[Record]
.groupBy($"hour")
.reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y)
.show
// +---+--------------+
// | _1| _2|
// +---+--------------+
// |[0]|[0,cat26,30.9]|
// |[1]|[1,cat67,28.5]|
// |[2]|[2,cat56,39.6]|
// |[3]| [3,cat8,35.6]|
// +---+--------------+
Spark 2.0 ou version ultérieure :
df.as[Record]
.groupByKey(_.Hour)
.reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)
Les deux dernières méthodes peuvent tirer parti de la combinaison côté carte et ne nécessitent pas de mélange complet, donc la plupart du temps, elles devraient présenter de meilleures performances par rapport aux fonctions de fenêtre et aux jointures. Ces cannes peuvent également être utilisées avec le streaming structuré en completed
mode sortie.
N'utilisez pas :
df.orderBy(...).groupBy(...).agg(first(...), ...)
Cela peut sembler fonctionner (en particulier dans le local
mode) mais ce n'est pas fiable (voir SPARK-16207 , merci à Tzach Zohar pour avoir lié le problème JIRA pertinent et SPARK-30335 ).
La même remarque s'applique à
df.orderBy(...).dropDuplicates(...)
qui utilise en interne un plan d'exécution équivalent.
Pour Spark 2.0.2 avec regroupement par plusieurs colonnes:
la source
C'est exactement la même chose que la réponse de zero323 , mais en mode requête SQL.
En supposant que le dataframe est créé et enregistré comme
Fonction de fenêtre:
Agrégation SQL simple suivie d'une jointure:
Utilisation de la commande sur les structures:
DataSets façon et ne font pas les mêmes que dans la réponse originale
la source
Le motif est groupé par clés => faire quelque chose pour chaque groupe, par exemple réduire => retourner au dataframe
Je pensais que l'abstraction Dataframe était un peu encombrante dans ce cas, j'ai donc utilisé la fonctionnalité RDD
la source
La solution ci-dessous ne fait qu'un groupBy et extrait les lignes de votre dataframe qui contiennent la maxValue en un seul coup. Pas besoin de jointures supplémentaires ou de Windows.
la source
Une bonne façon de faire cela avec l'API dataframe consiste à utiliser la logique argmax comme ceci
la source
Ici, vous pouvez faire comme ça -
la source
Nous pouvons utiliser la fonction de fenêtre rank () (où vous choisiriez le rang = 1) rank ajoute simplement un nombre pour chaque ligne d'un groupe (dans ce cas, ce serait l'heure)
voici un exemple. (de https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sql-functions.adoc#rank )
la source