Spark: UDF exécuté plusieurs fois

9

J'ai une trame de données avec le code suivant:

def test(lat: Double, lon: Double) = {
  println(s"testing ${lat / lon}")
  Map("one" -> "one", "two" -> "two")
}

val testUDF = udf(test _)

df.withColumn("test", testUDF(col("lat"), col("lon")))
  .withColumn("test1", col("test.one"))
  .withColumn("test2", col("test.two"))

Maintenant, en vérifiant les journaux, j'ai découvert que pour chaque ligne, l'UDF est exécuté 3 fois. Si j'ajoute le "test3" d'une colonne "test.three" alors l'UDF est exécuté une fois de plus.

Quelqu'un peut-il m'expliquer pourquoi?

Cela peut-il être évité correctement (sans mettre en cache la trame de données après l'ajout de "test", même si cela fonctionne)?

Rolintocour
la source
Que voulez-vous dire? Vous appelez la fonction de test trois fois. C'est pourquoi il est exécuté trois fois. Je ne sais pas pourquoi vous en faites un UDF. Pourquoi ne pas simplement faire de la carte un val?
user4601931
Ceci est juste un exemple pour montrer le comportement de l'étincelle. Pour moi, "test" est une nouvelle colonne qui contient une structure, puis accéder à n'importe quelle partie de la structure ne devrait pas exécuter à nouveau l'UDF. Comment je me trompe?
Rolintocour
J'ai essayé d'imprimer le schéma, le DataType de "test" est Mapet non un Struct. Maintenant, au lieu de renvoyer une carte, si l'UDF renvoie une classe de cas comme Test (une chaîne, deux: chaîne), il testy a bien un Struct mais il y a toujours autant d'exécutions de l'UDF.
Rolintocour
en relation: stackoverflow.com/questions/40320563/…
Raphael Roth
la mise en cache devrait fonctionner selon cette réponse: stackoverflow.com/a/40962714/1138523
Raphael Roth

Réponses:

5

Si vous souhaitez éviter plusieurs appels à un udf (ce qui est utile surtout si l'udf est un goulot d'étranglement dans votre travail), vous pouvez le faire comme suit:

val testUDF = udf(test _).asNondeterministic()

Fondamentalement, vous dites à Spark que votre fonction n'est pas déterministe et maintenant Spark s'assure qu'elle n'est appelée qu'une seule fois car il n'est pas sûr de l'appeler plusieurs fois (chaque appel peut éventuellement retourner un résultat différent).

Sachez également que cette astuce n'est pas gratuite, ce faisant, vous mettez des contraintes sur l'optimiseur, un effet secondaire de cela est par exemple que l'optimiseur Spark ne pousse pas les filtres à travers des expressions qui ne sont pas déterministes, vous devenez donc responsable de l'optimisation position des filtres dans votre requête.

David Vrba
la source
agréable! cette réponse appartient également ici: stackoverflow.com/questions/40320563/…
Raphael Roth
Dans mon cas, asNondeterministicforce l'UDF à ne s'exécuter qu'une seule fois. Avec la explode(array(myUdf($"id")))solution, il est toujours exécuté deux fois.
Rolintocour