(Pourquoi) devons-nous appeler le cache ou persister sur un RDD

171

Lorsqu'un ensemble de données distribuées résilient (RDD) est créé à partir d'un fichier texte ou d'une collection (ou d'un autre RDD), devons-nous appeler explicitement «cache» ou «persist» pour stocker les données RDD en mémoire? Ou les données RDD sont-elles stockées de manière distribuée dans la mémoire par défaut?

val textFile = sc.textFile("/user/emp.txt")

Selon ce que je comprends, après l'étape ci-dessus, textFile est un RDD et est disponible dans tout / partie de la mémoire du nœud.

Si tel est le cas, pourquoi devons-nous appeler "cache" ou "persist" sur textFile RDD alors?

Ramana
la source

Réponses:

300

La plupart des opérations RDD sont paresseuses. Considérez un RDD comme une description d'une série d'opérations. Un RDD n'est pas une donnée. Donc cette ligne:

val textFile = sc.textFile("/user/emp.txt")

Cela ne fait rien. Il crée un RDD qui dit "nous devrons charger ce fichier". Le fichier n'est pas chargé à ce stade.

Les opérations RDD qui nécessitent l'observation du contenu des données ne peuvent pas être paresseuses. (Celles-ci sont appelées actions .) Un exemple est RDD.count- pour vous indiquer le nombre de lignes dans le fichier, le fichier doit être lu. Donc, si vous écrivez textFile.count, à ce stade, le fichier sera lu, les lignes seront comptées et le nombre sera retourné.

Et si vous appelez à textFile.countnouveau? La même chose: le fichier sera lu et compté à nouveau. Rien n'est stocké. Un RDD n'est pas une donnée.

Alors qu'est-ce que ça RDD.cachefait? Si vous ajoutez textFile.cacheau code ci-dessus:

val textFile = sc.textFile("/user/emp.txt")
textFile.cache

Cela ne fait rien. RDD.cacheest également une opération paresseuse. Le fichier n'est toujours pas lu. Mais maintenant, le RDD dit "lire ce fichier et ensuite mettre en cache le contenu". Si vous exécutez ensuite textFile.countla première fois, le fichier sera chargé, mis en cache et compté. Si vous appelez textFile.countune deuxième fois, l'opération utilisera le cache. Il prendra juste les données du cache et comptera les lignes.

Le comportement du cache dépend de la mémoire disponible. Si le fichier ne tient pas dans la mémoire, par exemple, il textFile.countreviendra au comportement habituel et relira le fichier.

Daniel Darabos
la source
4
Salut Daniel, - lorsque vous appelez le cache, cela signifie-t-il que le RDD n'est pas rechargé à partir de la source (par exemple, un fichier texte) - comment pouvez-vous être sûr que les données du fichier texte sont les plus récentes lorsqu'elles sont mises en cache? (Spark comprend-il cela ou s'agit-il d'une opération manuelle pour dépersister () périodiquement pour s'assurer que les données sources sont recalculées plus tard dans la lignée?)
andrew.butkus
aussi - si vous devez annuler la persévérance périodiquement, - si vous avez un rdd qui est mis en cache, dépendant d'un autre RDD qui est mis en cache, devez-vous annuler la persistance des deux RDD pour voir les résultats recalculés?
andrew.butkus
21
Spark suppose simplement que le fichier ne changera jamais. Il lit le fichier à un moment arbitraire et peut en relire certaines parties si nécessaire ultérieurement. (Par exemple, si une partie des données a été expulsée du cache.) Il vaut donc mieux garder vos fichiers inchangés! Créez simplement un nouveau fichier avec un nouveau nom lorsque vous avez de nouvelles données, puis chargez-le en tant que nouveau RDD. Si vous obtenez continuellement de nouvelles données, consultez Spark Streaming.
Daniel Darabos
10
Oui. Les RDD sont immuables, donc chaque RDD suppose que ses dépendances sont également immuables. Spark Streaming vous permet de configurer de telles arborescences qui fonctionnent sur un flux de changements. Mais une solution encore plus simple consiste à construire l'arborescence dans une fonction qui prend un nom de fichier comme paramètre. Ensuite, appelez simplement la fonction pour le nouveau fichier et pouf, vous avez le nouvel arbre de calcul.
Daniel Darabos
1
@Humoyun: sur l'onglet Stockage de Spark UI, vous pouvez voir combien de chaque RDD est mis en cache. Les données peuvent être si volumineuses que seulement 40% d'entre elles tiennent dans la mémoire totale dont vous disposez pour la mise en cache. Une option dans ce cas consiste à utiliser perisistet à choisir une option de stockage qui permet de répandre les données du cache sur le disque.
Daniel Darabos
197

Je pense que la question serait mieux formulée comme suit:

Quand devons-nous appeler le cache ou persister sur un RDD?

Les processus Spark sont paresseux, c'est-à-dire que rien ne se passera tant que ce n'est pas nécessaire. Pour répondre rapidement à la question, après l' val textFile = sc.textFile("/user/emp.txt")émission, rien n'arrive aux données, seul un HadoopRDDest construit, en utilisant le fichier comme source.

Disons que nous transformons un peu ces données:

val wordsRDD = textFile.flatMap(line => line.split("\\W"))

Encore une fois, rien n'arrive aux données. Il existe maintenant un nouveau RDD wordsRDDqui contient une référence testFileet une fonction à appliquer en cas de besoin.

Ce n'est que lorsqu'une action est appelée sur un RDD, comme wordsRDD.countla chaîne RDD, appelée lignage, sera exécutée. Autrement dit, les données, réparties en partitions, seront chargées par les exécuteurs du cluster Spark, la flatMapfonction sera appliquée et le résultat sera calculé.

Sur une lignée linéaire, comme celle de cet exemple, cache()n'est pas nécessaire. Les données seront chargées dans les exécuteurs, toutes les transformations seront appliquées et finalement countelles seront calculées, le tout en mémoire - si les données tiennent en mémoire.

cacheest utile lorsque la lignée du RDD se ramifie. Supposons que vous souhaitiez filtrer les mots de l'exemple précédent en un décompte de mots positifs et négatifs. Vous pouvez faire ceci comme ça:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Ici, chaque branche émet un rechargement des données. L'ajout d'une cachedéclaration explicite garantira que le traitement effectué précédemment est conservé et réutilisé. Le travail ressemblera à ceci:

val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()

Pour cette raison, cacheon dit de «briser la lignée» car il crée un point de contrôle qui peut être réutilisé pour un traitement ultérieur.

Règle de base: à utiliser cachelorsque la lignée de votre RDD se ramifie ou lorsqu'un RDD est utilisé plusieurs fois comme dans une boucle.

maasg
la source
1
Impressionnant. Merci. Encore une question connexe. Lorsque nous mettons en cache ou persistons, les données seront stockées dans la mémoire de l'exécuteur ou dans la mémoire du nœud de travail. S'il s'agit de la mémoire de l'exécuteur, How Spark identifie quel exécuteur possède les données.
Ramana
1
@RamanaUppala la mémoire de l'exécuteur est utilisée. La fraction de mémoire de l'exécuteur utilisée pour la mise en cache est contrôlée par le fichier config spark.storage.memoryFraction. Concernant quel exécuteur a quelles données, un RDD gardera une trace de ses partitions qui sont distribuées sur les exécuteurs.
maasg
5
@maasg Corrigez-moi si je me trompe mais que ni cacheni persist ne peut briser la lignée .
zero323
Où serait stocké le wordsRDD si nous n'avons pas eu l'instruction .cache () dans l'exemple ci-dessus?
sun_dare
Et si avant les deux comptes, nous réunissons les deux branches en un rdd et comptons? dans ce cas, le cache est-il utile?
Xiawei Zhang
30

Avons-nous besoin d'appeler "cache" ou "persist" explicitement pour stocker les données RDD en mémoire?

Oui, seulement si nécessaire.

Les données RDD stockées de manière distribuée dans la mémoire par défaut?

Non!

Et voici les raisons pour lesquelles:

  • Spark prend en charge deux types de variables partagées: les variables de diffusion, qui peuvent être utilisées pour mettre en cache une valeur en mémoire sur tous les nœuds, et les accumulateurs, qui sont des variables uniquement «ajoutées», telles que les compteurs et les sommes.

  • Les RDD prennent en charge deux types d'opérations: les transformations, qui créent un nouvel ensemble de données à partir d'un ensemble existant, et les actions, qui renvoient une valeur au programme pilote après avoir exécuté un calcul sur l'ensemble de données. Par exemple, map est une transformation qui passe chaque élément de l'ensemble de données via une fonction et renvoie un nouveau RDD représentant les résultats. D'autre part, réduire est une action qui agrège tous les éléments du RDD en utilisant une fonction et retourne le résultat final au programme pilote (bien qu'il existe également une réduction parallèle qui renvoie un ensemble de données distribué).

  • Toutes les transformations dans Spark sont paresseuses, en ce sens qu'elles ne calculent pas leurs résultats tout de suite. Au lieu de cela, ils se souviennent simplement des transformations appliquées à un ensemble de données de base (par exemple un fichier). Les transformations ne sont calculées que lorsqu'une action nécessite qu'un résultat soit renvoyé au programme pilote. Cette conception permet à Spark de fonctionner plus efficacement - par exemple, nous pouvons réaliser qu'un ensemble de données créé via map sera utilisé dans une réduction et ne retournera que le résultat de la réduction au pilote, plutôt que le plus grand ensemble de données mappé.

  • Par défaut, chaque RDD transformé peut être recalculé chaque fois que vous exécutez une action dessus. Cependant, vous pouvez également conserver un RDD en mémoire à l'aide de la méthode persist (ou cache), auquel cas Spark conservera les éléments sur le cluster pour un accès beaucoup plus rapide la prochaine fois que vous l'interrogerez. Il existe également une prise en charge des RDD persistants sur le disque ou répliqués sur plusieurs nœuds.

Pour plus de détails, veuillez consulter le guide de programmation Spark .

Eliasah
la source
1
Cela n'a pas répondu à ma question.
Ramana
Qu'est-ce qui n'y répond pas?
eliasah
1
lorsque les données de RDD sont stockées dans la mémoire par défaut, pourquoi devons-nous appeler Cache ou Persist?
Ramana
Les RDD ne sont pas stockés en mémoire par défaut, donc la persistance du RDD permet à Spark d'effectuer la transformation plus rapidement sur le cluster
eliasah
2
C'est une bonne réponse, je ne sais pas pourquoi elle a été rejetée. C'est une réponse descendante, expliquant comment les RDD fonctionnent à partir des concepts de haut niveau. J'ai ajouté une autre réponse qui va de bas en haut: à partir de "que fait cette ligne". C'est peut-être plus facile à suivre pour quelqu'un qui commence tout juste avec Spark.
Daniel Darabos du
11

Voici les trois situations dans lesquelles vous devez mettre en cache vos RDD:

en utilisant un RDD plusieurs fois

effectuer plusieurs actions sur le même RDD

pour de longues chaînes de transformations (ou très coûteuses)

Rileyss
la source
7

Ajout d'une autre raison pour ajouter (ou ajouter temporairement) un cacheappel de méthode.

pour les problèmes de mémoire de débogage

avec la cacheméthode, spark donnera des informations de débogage concernant la taille du RDD. Ainsi, dans l'interface utilisateur intégrée Spark, vous obtiendrez des informations sur la consommation de mémoire RDD. et cela s'est avéré très utile pour diagnostiquer les problèmes de mémoire.

zinking
la source