Spark java.lang.OutOfMemoryError: espace de tas Java

228

Mon cluster: 1 maître, 11 esclaves, chaque nœud a 6 Go de mémoire.

Mes paramètres:

spark.executor.memory=4g, Dspark.akka.frameSize=512

Voici le problème:

Tout d'abord , j'ai lu certaines données (2,19 Go) de HDFS vers RDD:

val imageBundleRDD = sc.newAPIHadoopFile(...)

Deuxièmement , faites quelque chose sur ce RDD:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })

Enfin , sortie vers HDFS:

res.saveAsNewAPIHadoopFile(...)

Lorsque j'exécute mon programme, cela montre:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space

Il y a trop de tâches?

PS : Tout va bien lorsque les données d'entrée sont d'environ 225 Mo.

Comment puis-je résoudre ce problème?

hequn8128
la source
comment courir étincelle? est-ce de la console? ou quels scripts de déploiement utilisez-vous?
Tombart
J'utilise sbt pour compiler et exécuter mon application. paquet sbt puis exécution de sbt. J'ai implémenté le même programme sur hadoop il y a un mois et j'ai rencontré le même problème d'OutOfMemoryError, mais dans hadoop il peut être facilement résolu en augmentant la valeur de mapred.child.java.opts de Xmx200m à Xmx400m. Est-ce que spark a un paramètre jvm pour ses tâches? Je me demande si spark.executor.memory a la même signification que mapred.child.java.opts dans hadoop. Dans mon programme, spark.executor.memory a déjà été réglé sur 4g beaucoup plus grand que Xmx400m dans hadoop. Merci ~
hequn8128
Les trois étapes que vous mentionnez sont-elles les seules que vous effectuez? Quelle est la taille des données générées par (data._1, desPoints) - cela devrait tenir en mémoire, surtout si ces données sont ensuite mélangées à une autre étape
Arnon Rotem-Gal-Oz
1
Quelle est la configuration de la mémoire pour le pilote? Vérifiez quel serveur obtient l'erreur de mémoire insuffisante. Est-ce le pilote ou l'un des exécuteurs.
RanP
Voir ici toutes les propriétés des configurations: spark.apache.org/docs/2.1.0/configuration.html
Naramsim

Réponses:

364

J'ai quelques suggestions:

  • Si vos noeuds sont configurés pour avoir un maximum 6g pour Spark (et sont en laissant un peu pour d' autres processus), puis utilisez 6g plutôt que 4 g, spark.executor.memory=6g. Assurez-vous que vous utilisez autant de mémoire que possible en vérifiant l'interface utilisateur (elle indiquera la quantité de mem que vous utilisez)
  • Essayez d'utiliser plus de partitions, vous devriez avoir 2 à 4 par CPU. IME augmenter le nombre de partitions est souvent le moyen le plus simple de rendre un programme plus stable (et souvent plus rapide). Pour d'énormes quantités de données dont vous pourriez avoir besoin de plus de 4 par CPU, j'ai dû utiliser 8000 partitions dans certains cas!
  • Diminuez la fraction de mémoire réservée à la mise en cache , à l'aide de spark.storage.memoryFraction. Si vous n'utilisez pas cache()ou persistdans votre code, cela pourrait aussi bien être 0. Sa valeur par défaut est de 0,6, ce qui signifie que vous n'obtenez que 0,4 * 4 g de mémoire pour votre tas. IME réduisant le mem frac fait souvent disparaître les MOO. MISE À JOUR: Depuis spark 1.6 apparemment nous n'aurons plus besoin de jouer avec ces valeurs, spark les déterminera automatiquement.
  • Similaire à la fraction de mémoire ci-dessus mais aléatoire . Si votre travail n'a pas besoin de beaucoup de mémoire aléatoire, définissez-le sur une valeur inférieure (cela pourrait entraîner le débordement de vos mélanges sur le disque, ce qui peut avoir un impact catastrophique sur la vitesse). Parfois, lorsque c'est une opération de lecture aléatoire qui est OOM, vous devez faire le contraire, c'est-à-dire le définir sur quelque chose de grand, comme 0,8, ou assurez-vous de permettre à vos shuffles de se répandre sur le disque (c'est la valeur par défaut depuis 1.0.0).
  • Attention aux fuites de mémoire , elles sont souvent causées par la fermeture accidentelle d'objets dont vous n'avez pas besoin dans vos lambdas. La façon de diagnostiquer est de rechercher la «tâche sérialisée en XXX octets» dans les journaux, si XXX est plus grand que quelques k ou plus qu'un Mo, vous pouvez avoir une fuite de mémoire. Voir https://stackoverflow.com/a/25270600/1586965
  • Relatif à ci-dessus; utilisez des variables de diffusion si vous avez vraiment besoin de gros objets.
  • Si vous mettez en cache de gros RDD et que vous pouvez perdre un certain temps d'accès, envisagez de sérialiser le RDD http://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage . Ou même les mettre en cache sur le disque (ce qui n'est parfois pas si mal si vous utilisez des SSD).
  • ( Avancé ) Lié aux Stringstructures ci-dessus, éviter et fortement imbriquées ( Mapclasses de cas similaires et imbriquées). Si possible, essayez de n'utiliser que des types primitifs et d'indexer toutes les non primitives, surtout si vous vous attendez à beaucoup de doublons. Choisissez WrappedArrayautant que possible les structures imbriquées. Ou même déployez votre propre sérialisation - VOUS aurez le plus d'informations sur la façon de sauvegarder efficacement vos données en octets, UTILISEZ-LES !
  • ( bit hacky ) Encore une fois lors de la mise en cache, envisagez d'utiliser un Datasetpour mettre en cache votre structure car il utilisera une sérialisation plus efficace. Cela devrait être considéré comme un hack par rapport au point précédent. L'intégration de vos connaissances de domaine dans votre algo / sérialisation peut réduire l'espace mémoire / cache de 100x ou 1000x, alors que tout ce que vous Datasetobtiendrez est probablement 2x - 5x en mémoire et 10x compressé (parquet) sur le disque.

http://spark.apache.org/docs/1.2.1/configuration.html

EDIT: (donc je peux me google plus facilement) Ce qui suit est également indicatif de ce problème:

java.lang.OutOfMemoryError : GC overhead limit exceeded
samthebest
la source
Merci pour vos suggestions ~ Si je mets spark.executor.memory = 6g, spark aura le problème: "vérifiez l'interface utilisateur de votre cluster pour vous assurer que les employés sont enregistrés et ont suffisamment de mémoire". La définition de spark.storage.memoryFraction sur 0,1 ne peut pas non plus résoudre le problème. Peut-être que le problème réside dans mon code. Merci!
hequn8128
2
@samthebest C'est une réponse fantastique. J'apprécie vraiment l'aide à la journalisation pour trouver les fuites de mémoire.
Myles Baker, le
1
Salut @samthebest comment avez-vous spécifié 8000 partitions? Étant donné que j'utilise Spark sql, je ne peux spécifier que la partition en utilisant spark.sql.shuffle.partitions, la valeur par défaut est 200 si je la mets à plus J'ai essayé de la mettre à 1000 mais ne aidant pas à obtenir le MOO, savez-vous ce qui devrait être optimal valeur de partition J'ai 1 To de données asymétriques à traiter et cela implique des requêtes de regroupement par ruche. Veuillez guider.
Umesh K
2
Salut @ user449355 s'il vous plaît pourriez-vous poser une nouvelle question? De peur de commencer un long fil de commentaires :) Si vous rencontrez des problèmes, d'autres personnes le sont probablement, et une question le rendrait plus facile à trouver pour tous.
samthebest
1
Pour votre premier point, @samthebest, vous ne devez pas utiliser TOUTE la mémoire spark.executor.memorycar vous avez certainement besoin d'une certaine quantité de mémoire pour la surcharge d'E / S. Si vous utilisez tout cela, cela ralentira votre programme. L'exception à cela pourrait être Unix, auquel cas vous avez un espace de swap.
Hunle
58

Pour ajouter un cas d'utilisation à cela qui n'est souvent pas abordé, je proposerai une solution lors de la soumission d'une Sparkapplication via spark-submiten mode local .

Selon le gitbook Mastering Apache Spark de Jacek Laskowski :

Vous pouvez exécuter Spark en mode local. Dans ce mode de déploiement JVM unique non distribué, Spark génère tous les composants d'exécution - pilote, exécuteur, backend et maître - dans la même JVM. Il s'agit du seul mode où un pilote est utilisé pour l'exécution.

Ainsi, si vous rencontrez des OOMerreurs avec le heap, il suffit de régler le driver-memoryplutôt que le executor-memory.

Voici un exemple:

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar 
Brian
la source
Quel pourcentage nous devrions considérer pour la mémoire du pilote en mode autonome.
Yashwanth Kambala
@Brian, En mode local, la mémoire du pilote doit-elle être supérieure à la taille des données d'entrée? Est-il possible de spécifier le nombre de partitions pour le jeu de données d'entrée, afin que le travail Spark puisse traiter un jeu de données beaucoup plus grand que la RAM disponible?
fuyi
19

Vous devez configurer les paramètres de mémoire offHeap comme indiqué ci-dessous:

val spark = SparkSession
     .builder()
     .master("local[*]")
     .config("spark.executor.memory", "70g")
     .config("spark.driver.memory", "50g")
     .config("spark.memory.offHeap.enabled",true)
     .config("spark.memory.offHeap.size","16g")   
     .appName("sampleCodeForReference")
     .getOrCreate()

Donnez à la mémoire du pilote et de l'exécuteur la disponibilité de la RAM de votre machine. Vous pouvez augmenter la taille offHeap si vous êtes toujours confronté au problème OutofMemory .

pavan.vn101
la source
Ajout du paramètre offHeap aidé
kennyut
2
définir la mémoire du pilote dans votre code ne fonctionnera pas, lisez la documentation de spark pour cela: les propriétés Spark peuvent principalement être divisées en deux types: l'une est liée au déploiement, comme "spark.driver.memory", "spark.executor.instances", ce type de propriétés peut ne pas être affecté lors de la définition par programme via SparkConf lors de l'exécution, ou le comportement dépend du gestionnaire de cluster et du mode de déploiement que vous choisissez, il est donc suggéré de définir via le fichier de configuration ou les options de ligne de commande spark-submit.
Abdulhafeth Sartawi le
1
LA MEILLEURE RÉPONSE! Mon problème était que Spark n'était pas installé sur le nœud maître, je viens d'utiliser PySpark pour me connecter à HDFS et j'ai eu la même erreur. L'utilisation a configrésolu le problème.
Mikhail_Sam
Je viens d'ajouter les configurations à l'aide de la commande spark-submit pour résoudre le problème de taille de segment de mémoire. Merci.
Pritam Sadhukhan
16

Vous devez augmenter la mémoire du pilote. Dans votre dossier $ SPARK_HOME / conf, vous devriez trouver le fichier spark-defaults.conf, le modifier et le définir en spark.driver.memory 4000mfonction de la mémoire de votre maître, je pense. C'est ce qui a résolu le problème pour moi et tout se passe bien

blueskin
la source
Quel pourcentage de mem à allouer, en stand-alone
Yashwanth Kambala
14

Jetez un œil aux scripts de démarrage, une taille de tas Java y est définie, il semble que vous ne la définissiez pas avant d'exécuter Spark Worker.

# Set SPARK_MEM if it isn't already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"

Vous pouvez trouver la documentation pour déployer des scripts ici .

Tombart
la source
Merci ~ J'essaierai plus tard. De spark ui, cela montre que la mémoire de chaque exécuteur est 4096. Le paramètre a donc été activé, non?
hequn8128
J'ai vu votre réponse alors que je fais face à un problème similaire ( stackoverflow.com/questions/34762432/… ). En regardant le lien que vous avez fourni, il semble que la configuration de Xms / Xmx ne soit plus là, pouvez-vous dire pourquoi?
Seffy
Le contenu du script lié à par start up scriptsa malheureusement changé. Aucune option de ce type n'existait au 2019-12-19
David
7

J'ai beaucoup souffert de ce problème, nous utilisons l'allocation dynamique des ressources et j'ai pensé qu'il utiliserait mes ressources de cluster pour s'adapter au mieux à l'application.

Mais la vérité est que l'allocation dynamique des ressources ne définit pas la mémoire du pilote et la maintient à sa valeur par défaut qui est 1g.

Je l'ai résolu en définissant spark.driver.memory sur un nombre qui convient à la mémoire de mon pilote (pour 32 Go de RAM, je l'ai défini sur 18 Go)

vous pouvez le définir à l'aide de la commande spark submit comme suit:

spark-submit --conf spark.driver.memory=18gb ....cont

Remarque très importante, cette propriété ne sera pas prise en considération si vous la définissez à partir du code, selon la documentation de spark:

Les propriétés Spark peuvent principalement être divisées en deux types: l'une est liée au déploiement, comme «spark.driver.memory», «spark.executor.instances», ce type de propriétés peut ne pas être affecté lors de la configuration par programme via SparkConf lors de l'exécution, ou le comportement dépend du gestionnaire de cluster et du mode de déploiement que vous choisissez, il est donc suggéré de définir via le fichier de configuration ou les options de ligne de commande spark-submit; un autre est principalement lié au contrôle de l'exécution de Spark, comme «spark.task.maxFailures», ce type de propriétés peut être défini dans les deux sens.

Abdulhafeth Sartawi
la source
2
Vous devez utiliser --conf spark.driver.memory = 18g
merenptah
5

De manière générale, la mémoire JVM spark Executor peut être divisée en deux parties. Mémoire Spark et mémoire utilisateur. Ceci est contrôlé par la propriété spark.memory.fraction- la valeur est comprise entre 0 et 1. Lorsque vous travaillez avec des images ou effectuez un traitement gourmand en mémoire dans des applications spark, envisagez de diminuer laspark.memory.fraction . Cela rendra plus de mémoire disponible pour votre travail d'application. Spark peut déborder, il fonctionnera donc avec moins de partage de mémoire.

La deuxième partie du problème est la division du travail. Si possible, partitionnez vos données en petits morceaux. Les données plus petites nécessitent probablement moins de mémoire. Mais si cela n'est pas possible, vous sacrifiez le calcul pour la mémoire. En règle générale, un seul exécuteur exécutera plusieurs cœurs. La mémoire totale des exécuteurs doit être suffisante pour gérer les besoins en mémoire de toutes les tâches simultanées. Si l'augmentation de la mémoire de l'exécuteur n'est pas une option, vous pouvez diminuer les cœurs par exécuteur afin que chaque tâche dispose de plus de mémoire pour fonctionner. Testez avec 1 exécuteurs principaux qui ont la plus grande mémoire possible, puis continuez à augmenter les cœurs jusqu'à ce que vous trouviez le meilleur nombre de cœurs.

Rohit Karlupia
la source
5

Avez-vous vidé votre journal maître gc? J'ai donc rencontré un problème similaire et j'ai trouvé que SPARK_DRIVER_MEMORY ne définissait que le tas Xmx. La taille de segment de mémoire initiale reste 1G et la taille de segment de mémoire ne monte jamais jusqu'au segment Xmx.

Passer "--conf" spark.driver.extraJavaOptions = -Xms20g "résout mon problème.

ps aux | grep java et vous verrez le journal de suivi: =

24501 30,7 1,7 41782944 2318184 pts / 0 Sl + 18:49 0:33 / usr / java / latest / bin / java -cp / opt / spark / conf /: / opt / spark / jars / * -Xmx30g -Xms20g

Yunzhao Yang
la source
3

L'emplacement pour définir la taille du segment de mémoire (au moins dans spark-1.0.0) est dans conf / spark-env. Les variables pertinentes sont SPARK_EXECUTOR_MEMORY& SPARK_DRIVER_MEMORY. Plus de documents sont dans le guide de déploiement

N'oubliez pas non plus de copier le fichier de configuration sur tous les nœuds esclaves.

Amnon
la source
4
Comment savez-vous lequel régler entre SPARK_EXECUTOR_MEMORY& SPARK_DRIVER_MEMORY?
Hunle
13
c'est-à-dire quelle erreur vous dirait d'augmenter le SPARK_EXECUTOR_MEMORY, et quelle erreur vous dirait d'augmenter SPARK_DRIVER_MEMORY?
Hunle
2

J'ai quelques suggestions pour l'erreur mentionnée ci-dessus.

● Vérifiez la mémoire de l'exécuteur attribuée car un exécuteur pourrait avoir à gérer des partitions nécessitant plus de mémoire que ce qui est attribué.

● Essayez de voir si plus de shuffles sont actifs car les shuffles sont des opérations coûteuses car elles impliquent des E / S disque, la sérialisation des données et des E / S réseau

● Utiliser les jointures de diffusion

● Évitez d'utiliser groupByKeys et essayez de le remplacer par ReduceByKey

● Évitez d'utiliser d'énormes objets Java partout où le brassage se produit

Unmesha SreeVeni
la source
Désolé de détourner la requête de quelqu'un d'autre, mais comment utiliser ReduceByKey sur groupBy?
Somil Aseeja
1

D'après ma compréhension du code fourni ci-dessus, il charge le fichier et mappe les opérations et les enregistre. Aucune opération ne nécessite une lecture aléatoire. De plus, aucune opération ne nécessite que des données soient apportées au pilote. Par conséquent, tout réglage lié à la lecture aléatoire ou au pilote peut n'avoir aucun impact. Le pilote a des problèmes quand il y a trop de tâches mais ce n'est que jusqu'à la version spark 2.0.2. Il peut y avoir deux choses qui tournent mal.

  • Il n'y a qu'un ou quelques exécuteurs. Augmentez le nombre d'exécuteurs afin qu'ils puissent être affectés à différents esclaves. Si vous utilisez le fil, vous devez changer la configuration de num-executors ou si vous utilisez spark autonome, vous devez régler num cores par executor et spark max cores conf. En nombre d'exécuteurs autonomes = nombre maximal de cœurs / cœurs par exécuteur.
  • Le nombre de partitions est très faible ou peut-être un seul. Donc, si cela est faible même si nous avons plusieurs cœurs, plusieurs exécuteurs, cela ne sera pas d'une grande utilité car la parallélisation dépend du nombre de partitions. Augmentez donc les partitions en faisant imageBundleRDD.repartition (11)
Shridhar
la source
0

La définition de ces configurations exactes a aidé à résoudre le problème.

spark-submit --conf spark.yarn.maxAppAttempts=2 --executor-memory 10g --num-executors 50 --driver-memory 12g
swapnil shashank
la source