Tâche non sérialisable: java.io.NotSerializableException lors de l'appel de la fonction en dehors de la fermeture uniquement sur les classes et non les objets

224

Obtenir un comportement étrange lors de l'appel d'une fonction en dehors d'une fermeture:

  • quand la fonction est dans un objet tout fonctionne
  • lorsque la fonction est dans une classe, obtenez:

Tâche non sérialisable: java.io.NotSerializableException: test

Le problème est que j'ai besoin de mon code dans une classe et non un objet. Une idée pourquoi cela se produit? Un objet Scala est-il sérialisé (par défaut?)?

Ceci est un exemple de code fonctionnel:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

Voici l'exemple non fonctionnel:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}
Nimrod007
la source
Qu'est-ce que Spark.ctx? Il n'y a pas d'objet Spark avec la méthode ctx AFAICT
javadba

Réponses:

334

Les RDD étendent l'interface Serialisable , ce n'est donc pas ce qui provoque l'échec de votre tâche. Cela ne signifie pas que vous pouvez sérialiser un RDDavec Spark et éviterNotSerializableException

Spark est un moteur informatique distribué et sa principale abstraction est un ensemble de données réparties résilient ( RDD ), qui peut être considéré comme une collection distribuée. Fondamentalement, les éléments de RDD sont partitionnés sur les nœuds du cluster, mais Spark l'abstrait de l'utilisateur, permettant à l'utilisateur d'interagir avec le RDD (collection) comme s'il s'agissait d'un local.

Sans entrer dans trop de détails, mais lorsque vous exécutez différentes transformations sur un RDD ( map, flatMap, filteret autres), votre code de transformation (de fermeture) est la suivante :

  1. sérialisé sur le nœud du pilote,
  2. expédiés aux nœuds appropriés du cluster,
  3. désérialisé,
  4. et enfin exécuté sur les nœuds

Vous pouvez bien sûr l'exécuter localement (comme dans votre exemple), mais toutes ces phases (à l'exception de l'expédition sur le réseau) se produisent toujours. [Cela vous permet d'attraper tous les bugs avant même le déploiement en production]

Ce qui se passe dans votre deuxième cas, c'est que vous appelez une méthode, définie en classe testingdepuis l'intérieur de la fonction map. Spark voit cela et comme les méthodes ne peuvent pas être sérialisées par elles-mêmes, Spark essaie de sérialiser la testing classe entière , afin que le code fonctionne toujours lorsqu'il est exécuté dans une autre JVM. Vous avez deux possibilités:

Soit vous rendez les tests de classe sérialisables, donc toute la classe peut être sérialisée par Spark:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

ou vous faites de la someFuncfonction au lieu d'une méthode (les fonctions sont des objets dans Scala), afin que Spark puisse la sérialiser:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

Un problème similaire, mais pas le même avec la sérialisation de classe peut vous intéresser et vous pouvez le lire dans cette présentation Spark Summit 2013 .

Comme une note de côté, vous pouvez réécrire rddList.map(someFunc(_))à rddList.map(someFunc), ils sont exactement les mêmes. Habituellement, le second est préféré car il est moins bavard et plus propre à lire.

EDIT (2015-03-15): SPARK-5307 a introduit SerializationDebugger et Spark 1.3.0 est la première version à l'utiliser. Il ajoute un chemin de sérialisation à une exception NotSerializableException . Lorsqu'une exception NotSerializableException est rencontrée, le débogueur visite le graphique d'objet pour trouver le chemin vers l'objet qui ne peut pas être sérialisé et construit des informations pour aider l'utilisateur à trouver l'objet.

Dans le cas d'OP, c'est ce qui est imprimé sur stdout:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)
Grega Kešpret
la source
1
Hmm, ce que vous avez expliqué est certainement logique et explique pourquoi toute la classe est sérialisée (quelque chose que je n'ai pas bien compris). Néanmoins, je maintiendrai toujours que les rdd ne sont pas sérialisables (eh bien ils étendent Serializable, mais cela ne signifie pas qu'ils ne provoquent pas NotSerializableException, essayez-le). C'est pourquoi si vous les mettez en dehors des classes, cela corrige l'erreur. Je vais modifier ma réponse un peu pour être plus précis sur ce que je veux dire - c'est-à-dire qu'ils provoquent l'exception, pas qu'ils étendent l'interface.
samthebest
35
Dans le cas où vous n'avez pas de contrôle sur la classe, vous devez être sérialisable ... si vous utilisez Scala, vous pouvez simplement l'instancier avec Serializable:val test = new Test with Serializable
Mark S
4
"rddList.map (someFunc (_)) à rddList.map (someFunc), ils sont exactement les mêmes" Non, ils ne sont pas exactement les mêmes, et en fait, l'utilisation de ce dernier peut provoquer des exceptions de sérialisation, contrairement au premier.
samthebest
1
@samthebest pourriez-vous expliquer pourquoi map (someFunc (_)) ne provoquerait pas d'exceptions de sérialisation alors que map (someFunc) le ferait?
Alon
31

La réponse de Grega est excellente pour expliquer pourquoi le code d'origine ne fonctionne pas et deux façons de résoudre le problème. Cependant, cette solution n'est pas très flexible; considérons le cas où votre fermeture inclut un appel de méthode sur une non- Serializableclasse sur laquelle vous n'avez aucun contrôle. Vous ne pouvez ni ajouter la Serializablebalise à cette classe ni modifier l'implémentation sous-jacente pour changer la méthode en fonction.

Nilesh présente une excellente solution pour cela, mais la solution peut être rendue à la fois plus concise et générale:

def genMapper[A, B](f: A => B): A => B = {
  val locker = com.twitter.chill.MeatLocker(f)
  x => locker.get.apply(x)
}

Ce sérialiseur de fonctions peut ensuite être utilisé pour encapsuler automatiquement les fermetures et les appels de méthode:

rdd map genMapper(someFunc)

Cette technique a également l'avantage de ne pas nécessiter les dépendances supplémentaires de Shark pour y accéder KryoSerializationWrapper, puisque Twitter's Chill est déjà tiré par le noyau Spark

Ben Sidhom
la source
Bonjour, je me demande si je dois enregistrer quelque chose si j'utilise votre code? J'ai essayé d'obtenir une exception de classe find impossible de kryo. THX
G_cy
25

Discours complet expliquant pleinement le problème, qui propose une excellente façon de changer de paradigme pour éviter ces problèmes de sérialisation: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- leaks-no-ws.md

La réponse la plus votée suggère essentiellement de jeter une fonctionnalité linguistique complète - qui n'utilise plus de méthodes et uniquement des fonctions. En effet, dans les classes de programmation fonctionnelle, les méthodes doivent être évitées, mais les transformer en fonctions ne résout pas le problème de conception ici (voir le lien ci-dessus).

Comme solution rapide dans cette situation particulière, vous pouvez simplement utiliser l' @transientannotation pour lui dire de ne pas essayer de sérialiser la valeur incriminée (voici Spark.ctxune classe personnalisée qui n'est pas celle de Spark après la dénomination de l'OP):

@transient
val rddList = Spark.ctx.parallelize(list)

Vous pouvez également restructurer le code pour que rddList vive ailleurs, mais c'est aussi désagréable.

L'avenir est probablement des spores

À l'avenir, Scala inclura ces choses appelées «spores» qui devraient nous permettre de contrôler le grain fin ce qui est ou n'est pas exactement entraîné par une fermeture. En outre, cela devrait transformer toutes les erreurs consistant à extraire accidentellement des types non sérialisables (ou toute valeur indésirable) en erreurs de compilation plutôt que maintenant, ce qui est d'horribles exceptions d'exécution / fuites de mémoire.

http://docs.scala-lang.org/sips/pending/spores.html

Un conseil sur la sérialisation Kryo

Lorsque vous utilisez kyro, assurez-vous que l'enregistrement est nécessaire, cela signifie que vous obtenez des erreurs au lieu de fuites de mémoire:

"Enfin, je sais que kryo a kryo.setRegistrationOptional (true) mais j'ai du mal à essayer de comprendre comment l'utiliser. Lorsque cette option est activée, kryo semble toujours lever des exceptions si je ne me suis pas enregistré. Des classes."

Stratégie d'enregistrement des cours avec kryo

Bien sûr, cela vous donne uniquement un contrôle au niveau du type et non un contrôle au niveau des valeurs.

... plus d'idées à venir.

samthebest
la source
9

J'ai résolu ce problème en utilisant une approche différente. Il vous suffit de sérialiser les objets avant de passer par la fermeture, puis de désérialiser par la suite. Cette approche fonctionne, même si vos classes ne sont pas sérialisables, car elle utilise Kryo dans les coulisses. Tout ce dont vous avez besoin, c'est de curry. ;)

Voici un exemple de la façon dont je l'ai fait:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

N'hésitez pas à rendre Blah aussi compliqué que vous le souhaitez, classe, objet compagnon, classes imbriquées, références à plusieurs bibliothèques tierces.

KryoSerializationWrapper fait référence à: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

Nilesh
la source
Est-ce que cela sérialise réellement l'instance ou crée une instance statique et sérialise une référence (voir ma réponse).
samthebest
2
@samthebest pourriez-vous élaborer? Si vous étudiez, KryoSerializationWrappervous constaterez que cela fait penser à Spark que c'est effectivement java.io.Serializable- il sérialise simplement l'objet en interne en utilisant Kryo - plus rapide, plus simple. Et je ne pense pas qu'il s'agisse d'une instance statique - il désérialise simplement la valeur lorsque la valeur.apply () est appelée.
Nilesh
8

J'ai rencontré un problème similaire, et ce que je comprends de la réponse de Grega est

object NOTworking extends App {
 new testing().doIT
}
//adding extends Serializable wont help
class testing {

val list = List(1,2,3)

val rddList = Spark.ctx.parallelize(list)

def doIT =  {
  //again calling the fucntion someFunc 
  val after = rddList.map(someFunc(_))
  //this will crash (spark lazy)
  after.collect().map(println(_))
}

def someFunc(a:Int) = a+1

}

votre méthode doIT tente de sérialiser la méthode someFunc (_) , mais comme la méthode n'est pas sérialisable, elle essaie de sérialiser le test de classe qui n'est pas sérialisable.

Donc, faites fonctionner votre code, vous devez définir someFunc dans la méthode doIT . Par exemple:

def doIT =  {
 def someFunc(a:Int) = a+1
  //function definition
 }
 val after = rddList.map(someFunc(_))
 after.collect().map(println(_))
}

Et s'il y a plusieurs fonctions à venir dans l'image, alors toutes ces fonctions devraient être disponibles pour le contexte parent.

Tarang Bhalodia
la source
7

Je ne suis pas tout à fait certain que cela s'applique à Scala mais, en Java, j'ai résolu le problème NotSerializableExceptionen refactorisant mon code afin que la fermeture n'accède pas à un finalchamp non sérialisable .

Trebor Rude
la source
je fais face au même problème en Java, j'essaie d'utiliser la classe FileWriter du package Java IO à l'intérieur de la méthode RDD foreach. Pouvez-vous s'il vous plaît laissez-moi savoir comment nous pouvons résoudre ce problème.
Shankar
1
Eh bien @Shankar, si le FileWriterest un finalchamp de la classe externe, vous ne pouvez pas le faire. Mais FileWriterpeut être construit à partir de a Stringou de a File, les deux étant Serializable. Donc, refactorisez votre code pour construire un local FileWriterbasé sur le nom de fichier de la classe externe.
Trebor Rude
0

Pour info dans Spark 2.4, vous serez probablement nombreux à rencontrer ce problème. La sérialisation Kryo s'est améliorée mais dans de nombreux cas, vous ne pouvez pas utiliser spark.kryo.unsafe = true ou le sérialiseur kryo naïf.

Pour une solution rapide, essayez de modifier les éléments suivants dans votre configuration Spark

spark.kryo.unsafe="false"

OU

spark.serializer="org.apache.spark.serializer.JavaSerializer"

Je modifie les transformations RDD personnalisées que je rencontre ou écris personnellement en utilisant des variables de diffusion explicites et en utilisant la nouvelle API Twitter-Chill intégrée, en les convertissant rdd.map(row =>en rdd.mapPartitions(partition => {fonctions.

Exemple

Vieille (pas géniale)

val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
    val value = sampleMap.get(row._1)
    value
})

Alternative (meilleure) façon

import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))

rdd.mapPartitions(partition => {
    val deSerSampleMap = brdSerSampleMap.value.get
    partition.map(row => {
        val value = sampleMap.get(row._1)
        value
    }).toIterator
})

Cette nouvelle façon n'appellera la variable de diffusion qu'une seule fois par partition, ce qui est mieux. Vous devrez toujours utiliser la sérialisation Java si vous n'enregistrez pas de classes.

Église de Gabe
la source