Obtenir un comportement étrange lors de l'appel d'une fonction en dehors d'une fermeture:
- quand la fonction est dans un objet tout fonctionne
- lorsque la fonction est dans une classe, obtenez:
Tâche non sérialisable: java.io.NotSerializableException: test
Le problème est que j'ai besoin de mon code dans une classe et non un objet. Une idée pourquoi cela se produit? Un objet Scala est-il sérialisé (par défaut?)?
Ceci est un exemple de code fonctionnel:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
Voici l'exemple non fonctionnel:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
scala
serialization
apache-spark
typesafe
Nimrod007
la source
la source
Réponses:
Les RDD étendent l'interface Serialisable , ce n'est donc pas ce qui provoque l'échec de votre tâche. Cela ne signifie pas que vous pouvez sérialiser un
RDD
avec Spark et éviterNotSerializableException
Spark est un moteur informatique distribué et sa principale abstraction est un ensemble de données réparties résilient ( RDD ), qui peut être considéré comme une collection distribuée. Fondamentalement, les éléments de RDD sont partitionnés sur les nœuds du cluster, mais Spark l'abstrait de l'utilisateur, permettant à l'utilisateur d'interagir avec le RDD (collection) comme s'il s'agissait d'un local.
Sans entrer dans trop de détails, mais lorsque vous exécutez différentes transformations sur un RDD (
map
,flatMap
,filter
et autres), votre code de transformation (de fermeture) est la suivante :Vous pouvez bien sûr l'exécuter localement (comme dans votre exemple), mais toutes ces phases (à l'exception de l'expédition sur le réseau) se produisent toujours. [Cela vous permet d'attraper tous les bugs avant même le déploiement en production]
Ce qui se passe dans votre deuxième cas, c'est que vous appelez une méthode, définie en classe
testing
depuis l'intérieur de la fonction map. Spark voit cela et comme les méthodes ne peuvent pas être sérialisées par elles-mêmes, Spark essaie de sérialiser latesting
classe entière , afin que le code fonctionne toujours lorsqu'il est exécuté dans une autre JVM. Vous avez deux possibilités:Soit vous rendez les tests de classe sérialisables, donc toute la classe peut être sérialisée par Spark:
ou vous faites de la
someFunc
fonction au lieu d'une méthode (les fonctions sont des objets dans Scala), afin que Spark puisse la sérialiser:Un problème similaire, mais pas le même avec la sérialisation de classe peut vous intéresser et vous pouvez le lire dans cette présentation Spark Summit 2013 .
Comme une note de côté, vous pouvez réécrire
rddList.map(someFunc(_))
àrddList.map(someFunc)
, ils sont exactement les mêmes. Habituellement, le second est préféré car il est moins bavard et plus propre à lire.EDIT (2015-03-15): SPARK-5307 a introduit SerializationDebugger et Spark 1.3.0 est la première version à l'utiliser. Il ajoute un chemin de sérialisation à une exception NotSerializableException . Lorsqu'une exception NotSerializableException est rencontrée, le débogueur visite le graphique d'objet pour trouver le chemin vers l'objet qui ne peut pas être sérialisé et construit des informations pour aider l'utilisateur à trouver l'objet.
Dans le cas d'OP, c'est ce qui est imprimé sur stdout:
la source
val test = new Test with Serializable
La réponse de Grega est excellente pour expliquer pourquoi le code d'origine ne fonctionne pas et deux façons de résoudre le problème. Cependant, cette solution n'est pas très flexible; considérons le cas où votre fermeture inclut un appel de méthode sur une non-
Serializable
classe sur laquelle vous n'avez aucun contrôle. Vous ne pouvez ni ajouter laSerializable
balise à cette classe ni modifier l'implémentation sous-jacente pour changer la méthode en fonction.Nilesh présente une excellente solution pour cela, mais la solution peut être rendue à la fois plus concise et générale:
Ce sérialiseur de fonctions peut ensuite être utilisé pour encapsuler automatiquement les fermetures et les appels de méthode:
Cette technique a également l'avantage de ne pas nécessiter les dépendances supplémentaires de Shark pour y accéder
KryoSerializationWrapper
, puisque Twitter's Chill est déjà tiré par le noyau Sparkla source
Discours complet expliquant pleinement le problème, qui propose une excellente façon de changer de paradigme pour éviter ces problèmes de sérialisation: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- leaks-no-ws.md
La réponse la plus votée suggère essentiellement de jeter une fonctionnalité linguistique complète - qui n'utilise plus de méthodes et uniquement des fonctions. En effet, dans les classes de programmation fonctionnelle, les méthodes doivent être évitées, mais les transformer en fonctions ne résout pas le problème de conception ici (voir le lien ci-dessus).
Comme solution rapide dans cette situation particulière, vous pouvez simplement utiliser l'
@transient
annotation pour lui dire de ne pas essayer de sérialiser la valeur incriminée (voiciSpark.ctx
une classe personnalisée qui n'est pas celle de Spark après la dénomination de l'OP):Vous pouvez également restructurer le code pour que rddList vive ailleurs, mais c'est aussi désagréable.
L'avenir est probablement des spores
À l'avenir, Scala inclura ces choses appelées «spores» qui devraient nous permettre de contrôler le grain fin ce qui est ou n'est pas exactement entraîné par une fermeture. En outre, cela devrait transformer toutes les erreurs consistant à extraire accidentellement des types non sérialisables (ou toute valeur indésirable) en erreurs de compilation plutôt que maintenant, ce qui est d'horribles exceptions d'exécution / fuites de mémoire.
http://docs.scala-lang.org/sips/pending/spores.html
Un conseil sur la sérialisation Kryo
Lorsque vous utilisez kyro, assurez-vous que l'enregistrement est nécessaire, cela signifie que vous obtenez des erreurs au lieu de fuites de mémoire:
"Enfin, je sais que kryo a kryo.setRegistrationOptional (true) mais j'ai du mal à essayer de comprendre comment l'utiliser. Lorsque cette option est activée, kryo semble toujours lever des exceptions si je ne me suis pas enregistré. Des classes."
Stratégie d'enregistrement des cours avec kryo
Bien sûr, cela vous donne uniquement un contrôle au niveau du type et non un contrôle au niveau des valeurs.
... plus d'idées à venir.
la source
J'ai résolu ce problème en utilisant une approche différente. Il vous suffit de sérialiser les objets avant de passer par la fermeture, puis de désérialiser par la suite. Cette approche fonctionne, même si vos classes ne sont pas sérialisables, car elle utilise Kryo dans les coulisses. Tout ce dont vous avez besoin, c'est de curry. ;)
Voici un exemple de la façon dont je l'ai fait:
N'hésitez pas à rendre Blah aussi compliqué que vous le souhaitez, classe, objet compagnon, classes imbriquées, références à plusieurs bibliothèques tierces.
KryoSerializationWrapper fait référence à: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
la source
KryoSerializationWrapper
vous constaterez que cela fait penser à Spark que c'est effectivementjava.io.Serializable
- il sérialise simplement l'objet en interne en utilisant Kryo - plus rapide, plus simple. Et je ne pense pas qu'il s'agisse d'une instance statique - il désérialise simplement la valeur lorsque la valeur.apply () est appelée.J'ai rencontré un problème similaire, et ce que je comprends de la réponse de Grega est
votre méthode doIT tente de sérialiser la méthode someFunc (_) , mais comme la méthode n'est pas sérialisable, elle essaie de sérialiser le test de classe qui n'est pas sérialisable.
Donc, faites fonctionner votre code, vous devez définir someFunc dans la méthode doIT . Par exemple:
Et s'il y a plusieurs fonctions à venir dans l'image, alors toutes ces fonctions devraient être disponibles pour le contexte parent.
la source
Je ne suis pas tout à fait certain que cela s'applique à Scala mais, en Java, j'ai résolu le problème
NotSerializableException
en refactorisant mon code afin que la fermeture n'accède pas à unfinal
champ non sérialisable .la source
FileWriter
est unfinal
champ de la classe externe, vous ne pouvez pas le faire. MaisFileWriter
peut être construit à partir de aString
ou de aFile
, les deux étantSerializable
. Donc, refactorisez votre code pour construire un localFileWriter
basé sur le nom de fichier de la classe externe.Pour info dans Spark 2.4, vous serez probablement nombreux à rencontrer ce problème. La sérialisation Kryo s'est améliorée mais dans de nombreux cas, vous ne pouvez pas utiliser spark.kryo.unsafe = true ou le sérialiseur kryo naïf.
Pour une solution rapide, essayez de modifier les éléments suivants dans votre configuration Spark
OU
Je modifie les transformations RDD personnalisées que je rencontre ou écris personnellement en utilisant des variables de diffusion explicites et en utilisant la nouvelle API Twitter-Chill intégrée, en les convertissant
rdd.map(row =>
enrdd.mapPartitions(partition => {
fonctions.Exemple
Vieille (pas géniale)
Alternative (meilleure) façon
Cette nouvelle façon n'appellera la variable de diffusion qu'une seule fois par partition, ce qui est mieux. Vous devrez toujours utiliser la sérialisation Java si vous n'enregistrez pas de classes.
la source