La plupart des opérations RDD sont paresseuses. Considérez un RDD comme une description d'une série d'opérations. Un RDD n'est pas une donnée. Donc cette ligne:
val textFile = sc.textFile("/user/emp.txt")
Cela ne fait rien. Il crée un RDD qui dit "nous devrons charger ce fichier". Le fichier n'est pas chargé à ce stade.
Les opérations RDD qui nécessitent l'observation du contenu des données ne peuvent pas être paresseuses. (Celles-ci sont appelées actions .) Un exemple est RDD.count
- pour vous indiquer le nombre de lignes dans le fichier, le fichier doit être lu. Donc, si vous écrivez textFile.count
, à ce stade, le fichier sera lu, les lignes seront comptées et le nombre sera retourné.
Et si vous appelez à textFile.count
nouveau? La même chose: le fichier sera lu et compté à nouveau. Rien n'est stocké. Un RDD n'est pas une donnée.
Alors qu'est-ce que ça RDD.cache
fait? Si vous ajoutez textFile.cache
au code ci-dessus:
val textFile = sc.textFile("/user/emp.txt")
textFile.cache
Cela ne fait rien. RDD.cache
est également une opération paresseuse. Le fichier n'est toujours pas lu. Mais maintenant, le RDD dit "lire ce fichier et ensuite mettre en cache le contenu". Si vous exécutez ensuite textFile.count
la première fois, le fichier sera chargé, mis en cache et compté. Si vous appelez textFile.count
une deuxième fois, l'opération utilisera le cache. Il prendra juste les données du cache et comptera les lignes.
Le comportement du cache dépend de la mémoire disponible. Si le fichier ne tient pas dans la mémoire, par exemple, il textFile.count
reviendra au comportement habituel et relira le fichier.
perisist
et à choisir une option de stockage qui permet de répandre les données du cache sur le disque.Je pense que la question serait mieux formulée comme suit:
Quand devons-nous appeler le cache ou persister sur un RDD?
Les processus Spark sont paresseux, c'est-à-dire que rien ne se passera tant que ce n'est pas nécessaire. Pour répondre rapidement à la question, après l'
val textFile = sc.textFile("/user/emp.txt")
émission, rien n'arrive aux données, seul unHadoopRDD
est construit, en utilisant le fichier comme source.Disons que nous transformons un peu ces données:
Encore une fois, rien n'arrive aux données. Il existe maintenant un nouveau RDD
wordsRDD
qui contient une référencetestFile
et une fonction à appliquer en cas de besoin.Ce n'est que lorsqu'une action est appelée sur un RDD, comme
wordsRDD.count
la chaîne RDD, appelée lignage, sera exécutée. Autrement dit, les données, réparties en partitions, seront chargées par les exécuteurs du cluster Spark, laflatMap
fonction sera appliquée et le résultat sera calculé.Sur une lignée linéaire, comme celle de cet exemple,
cache()
n'est pas nécessaire. Les données seront chargées dans les exécuteurs, toutes les transformations seront appliquées et finalementcount
elles seront calculées, le tout en mémoire - si les données tiennent en mémoire.cache
est utile lorsque la lignée du RDD se ramifie. Supposons que vous souhaitiez filtrer les mots de l'exemple précédent en un décompte de mots positifs et négatifs. Vous pouvez faire ceci comme ça:Ici, chaque branche émet un rechargement des données. L'ajout d'une
cache
déclaration explicite garantira que le traitement effectué précédemment est conservé et réutilisé. Le travail ressemblera à ceci:Pour cette raison,
cache
on dit de «briser la lignée» car il crée un point de contrôle qui peut être réutilisé pour un traitement ultérieur.Règle de base: à utiliser
cache
lorsque la lignée de votre RDD se ramifie ou lorsqu'un RDD est utilisé plusieurs fois comme dans une boucle.la source
spark.storage.memoryFraction
. Concernant quel exécuteur a quelles données, un RDD gardera une trace de ses partitions qui sont distribuées sur les exécuteurs.cache
nipersist
ne peut briser la lignée .Avons-nous besoin d'appeler "cache" ou "persist" explicitement pour stocker les données RDD en mémoire?
Oui, seulement si nécessaire.
Les données RDD stockées de manière distribuée dans la mémoire par défaut?
Non!
Et voici les raisons pour lesquelles:
Spark prend en charge deux types de variables partagées: les variables de diffusion, qui peuvent être utilisées pour mettre en cache une valeur en mémoire sur tous les nœuds, et les accumulateurs, qui sont des variables uniquement «ajoutées», telles que les compteurs et les sommes.
Les RDD prennent en charge deux types d'opérations: les transformations, qui créent un nouvel ensemble de données à partir d'un ensemble existant, et les actions, qui renvoient une valeur au programme pilote après avoir exécuté un calcul sur l'ensemble de données. Par exemple, map est une transformation qui passe chaque élément de l'ensemble de données via une fonction et renvoie un nouveau RDD représentant les résultats. D'autre part, réduire est une action qui agrège tous les éléments du RDD en utilisant une fonction et retourne le résultat final au programme pilote (bien qu'il existe également une réduction parallèle qui renvoie un ensemble de données distribué).
Toutes les transformations dans Spark sont paresseuses, en ce sens qu'elles ne calculent pas leurs résultats tout de suite. Au lieu de cela, ils se souviennent simplement des transformations appliquées à un ensemble de données de base (par exemple un fichier). Les transformations ne sont calculées que lorsqu'une action nécessite qu'un résultat soit renvoyé au programme pilote. Cette conception permet à Spark de fonctionner plus efficacement - par exemple, nous pouvons réaliser qu'un ensemble de données créé via map sera utilisé dans une réduction et ne retournera que le résultat de la réduction au pilote, plutôt que le plus grand ensemble de données mappé.
Par défaut, chaque RDD transformé peut être recalculé chaque fois que vous exécutez une action dessus. Cependant, vous pouvez également conserver un RDD en mémoire à l'aide de la méthode persist (ou cache), auquel cas Spark conservera les éléments sur le cluster pour un accès beaucoup plus rapide la prochaine fois que vous l'interrogerez. Il existe également une prise en charge des RDD persistants sur le disque ou répliqués sur plusieurs nœuds.
Pour plus de détails, veuillez consulter le guide de programmation Spark .
la source
Voici les trois situations dans lesquelles vous devez mettre en cache vos RDD:
la source
Ajout d'une autre raison pour ajouter (ou ajouter temporairement) un
cache
appel de méthode.pour les problèmes de mémoire de débogage
avec la
cache
méthode, spark donnera des informations de débogage concernant la taille du RDD. Ainsi, dans l'interface utilisateur intégrée Spark, vous obtiendrez des informations sur la consommation de mémoire RDD. et cela s'est avéré très utile pour diagnostiquer les problèmes de mémoire.la source