J'ai une trame de données avec le code suivant:
def test(lat: Double, lon: Double) = {
println(s"testing ${lat / lon}")
Map("one" -> "one", "two" -> "two")
}
val testUDF = udf(test _)
df.withColumn("test", testUDF(col("lat"), col("lon")))
.withColumn("test1", col("test.one"))
.withColumn("test2", col("test.two"))
Maintenant, en vérifiant les journaux, j'ai découvert que pour chaque ligne, l'UDF est exécuté 3 fois. Si j'ajoute le "test3" d'une colonne "test.three" alors l'UDF est exécuté une fois de plus.
Quelqu'un peut-il m'expliquer pourquoi?
Cela peut-il être évité correctement (sans mettre en cache la trame de données après l'ajout de "test", même si cela fonctionne)?
scala
apache-spark
apache-spark-sql
Rolintocour
la source
la source
Map
et non un Struct. Maintenant, au lieu de renvoyer une carte, si l'UDF renvoie une classe de cas comme Test (une chaîne, deux: chaîne), iltest
y a bien un Struct mais il y a toujours autant d'exécutions de l'UDF.Réponses:
Si vous souhaitez éviter plusieurs appels à un udf (ce qui est utile surtout si l'udf est un goulot d'étranglement dans votre travail), vous pouvez le faire comme suit:
Fondamentalement, vous dites à Spark que votre fonction n'est pas déterministe et maintenant Spark s'assure qu'elle n'est appelée qu'une seule fois car il n'est pas sûr de l'appeler plusieurs fois (chaque appel peut éventuellement retourner un résultat différent).
Sachez également que cette astuce n'est pas gratuite, ce faisant, vous mettez des contraintes sur l'optimiseur, un effet secondaire de cela est par exemple que l'optimiseur Spark ne pousse pas les filtres à travers des expressions qui ne sont pas déterministes, vous devenez donc responsable de l'optimisation position des filtres dans votre requête.
la source
asNondeterministic
force l'UDF à ne s'exécuter qu'une seule fois. Avec laexplode(array(myUdf($"id")))
solution, il est toujours exécuté deux fois.