Comment Hadoop traite-t-il les enregistrements répartis entre les limites des blocs?

119

Selon le Hadoop - The Definitive Guide

Les enregistrements logiques définis par FileInputFormats ne s'intègrent généralement pas parfaitement dans les blocs HDFS. Par exemple, les enregistrements logiques d'un TextInputFormat sont des lignes qui traverseront le plus souvent les limites HDFS. Cela n'a aucune incidence sur le fonctionnement de votre programme - les lignes ne sont ni manquées ni interrompues, par exemple - mais cela vaut la peine de le savoir, car cela signifie que les mappages locaux de données (c'est-à-dire les mappes qui s'exécutent sur le même hôte que leur données d'entrée) effectuera des lectures à distance. La légère surcharge que cela entraîne n'est normalement pas significative.

Supposons qu'une ligne d'enregistrement soit divisée en deux blocs (b1 et b2). Le mappeur traitant le premier bloc (b1) remarquera que la dernière ligne n'a pas de séparateur EOL et récupère le reste de la ligne du bloc de données suivant (b2).

Comment le mappeur traitant le deuxième bloc (b2) détermine-t-il que le premier enregistrement est incomplet et doit traiter à partir du deuxième enregistrement dans le bloc (b2)?

Praveen Sripati
la source

Réponses:

160

Question intéressante, j'ai passé du temps à regarder le code pour les détails et voici mes pensées. Les fractionnements sont gérés par le client par InputFormat.getSplits, donc un coup d'oeil à FileInputFormat donne les informations suivantes:

  • Pour chaque fichier d'entrée, obtenez la longueur du fichier, la taille du bloc et calculez la taille de la division comme max(minSize, min(maxSize, blockSize))maxSizecorrespond à mapred.max.split.sizeet minSizeest mapred.min.split.size.
  • Divisez le fichier en différents FileSplits en fonction de la taille de division calculée ci-dessus. L'important ici est que chacun FileSplitsoit initialisé avec un startparamètre correspondant au décalage dans le fichier d'entrée . Il n'y a toujours pas de traitement des lignes à ce stade. La partie pertinente du code ressemble à ceci:

    while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
      int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
      splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                               blkLocations[blkIndex].getHosts()));
      bytesRemaining -= splitSize;
    }
    

Après cela, si vous regardez le LineRecordReaderqui est défini par le TextInputFormat, c'est là que les lignes sont gérées:

  • Lorsque vous initialisez votre, LineRecordReaderil essaie d'instancier un LineReaderqui est une abstraction pour pouvoir lire les lignes FSDataInputStream. Il y a 2 cas:
  • S'il y a un CompressionCodecdéfini, alors ce codec est responsable de la gestion des limites. Probablement pas pertinent pour votre question.
  • S'il n'y a pas de codec cependant, c'est là que les choses sont intéressantes: si le startde votre InputSplitest différent de 0, alors vous reculez d'un caractère puis sautez la première ligne que vous rencontrez identifiée par \ n ou \ r \ n (Windows) ! Le retour en arrière est important car dans le cas où vos limites de ligne sont les mêmes que les limites de division, cela garantit que vous ne sautez pas la ligne valide. Voici le code pertinent:

    if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
    } else {
       if (start != 0) {
         skipFirstLine = true;
         --start;
         fileIn.seek(start);
       }
       in = new LineReader(fileIn, job);
    }
    if (skipFirstLine) {  // skip first line and re-establish "start".
      start += in.readLine(new Text(), 0,
                        (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
    

Ainsi, puisque les fractionnements sont calculés dans le client, les mappeurs n'ont pas besoin de s'exécuter en séquence, chaque mappeur sait déjà s'il doit ignorer la première ligne ou non.

Donc, fondamentalement, si vous avez 2 lignes de chaque 100 Mo dans le même fichier, et pour simplifier, disons que la taille de division est de 64 Mo. Ensuite, lorsque les fractionnements d'entrée sont calculés, nous aurons le scénario suivant:

  • Split 1 contenant le chemin et les hôtes de ce bloc. Initialisé au début 200-200 = 0 Mo, longueur 64 Mo.
  • Split 2 initialisé au début 200-200 + 64 = 64 Mo, longueur 64 Mo.
  • Split 3 initialisé au début 200-200 + 128 = 128 Mo, longueur 64 Mo.
  • Split 4 initialisé au début 200-200 + 192 = 192 Mo, longueur 8 Mo.
  • Le mappeur A traitera la division 1, le début est 0, donc ne sautez pas la première ligne et lisez une ligne complète qui dépasse la limite de 64 Mo et nécessite donc une lecture à distance.
  • Le mappeur B traitera la division 2, le début est! = 0 donc sautez la première ligne après 64 Mo-1 octet, ce qui correspond à la fin de la ligne 1 à 100 Mo qui est toujours dans la division 2, nous avons 28 Mo de la ligne dans la division 2, donc à distance lire les 72 Mo restants.
  • Le mappeur C traitera la division 3, le début est! = 0 donc sautez la première ligne après 128 Mo-1 octet, ce qui correspond à la fin de la ligne 2 à 200 Mo, qui est la fin du fichier, alors ne faites rien.
  • Le mappeur D est le même que le mappeur C sauf qu'il recherche une nouvelle ligne après 192 Mo-1 octet.
Charles Menguy
la source
De plus, @PraveenSripati, il convient de mentionner que les cas de bord où une limite serait à \ r dans un \ r \ n retour sont gérés dans la LineReader.readLinefonction, je ne pense pas que cela soit pertinent pour votre question mais peut ajouter plus de détails si nécessaire.
Charles Menguy
Supposons qu'il y ait deux lignes avec exactement 64 Mo dans l'entrée et que les InputSplits se produisent exactement aux limites de la ligne. Donc, le mappeur ignorera-t-il toujours la ligne du deuxième bloc parce que start! = 0.
Praveen Sripati
6
@PraveenSripati Dans ce cas, le deuxième mappeur verra start! = 0, donc revenir en arrière 1 caractère, ce qui vous ramène juste avant le \ n de la première ligne, puis passez au suivant \ n. Il sautera donc la première ligne mais traitera la deuxième ligne comme prévu.
Charles Menguy
@CharlesMenguy est-il possible que la première ligne du fichier soit ignorée d'une manière ou d'une autre? Concrètement, j'ai la première ligne avec clé = 1, et la valeur a, puis il y a deux autres lignes avec la même clé quelque part dans le fichier, clé = 1, val = b et clé = 1, val = c. Le truc, c'est que mon réducteur obtient {1, [b, c]} et {1, [a]}, au lieu de {1, [a, b, c]}. Cela ne se produit pas si j'ajoute une nouvelle ligne au début de mon fichier. Quelle pourrait en être la raison, Monsieur?
Kobe-Wan Kenobi
@CharlesMenguy Que faire si le fichier sur HDFS est un fichier binaire (par opposition à un fichier texte, dans lequel \r\n, \nreprésente la troncature d'enregistrement)?
CᴴᴀZ
17

L' algorithme Map Reduce ne fonctionne pas sur les blocs physiques du fichier. Cela fonctionne sur les fractionnements d'entrée logiques. La répartition des entrées dépend de l'endroit où l'enregistrement a été écrit. Un enregistrement peut s'étendre sur deux mappeurs.

La façon dont HDFS a été configuré, il décompose les fichiers très volumineux en gros blocs (par exemple, mesurant 128 Mo) et stocke trois copies de ces blocs sur différents nœuds du cluster.

HDFS n'a aucune connaissance du contenu de ces fichiers. Un enregistrement peut avoir été commencé dans le bloc-a mais la fin de cet enregistrement peut être présente dans le bloc-b .

Pour résoudre ce problème, Hadoop utilise une représentation logique des données stockées dans des blocs de fichiers, appelées fractionnements d'entrée. Lorsqu'un client de travail MapReduce calcule les fractionnements d'entrée , il détermine où commence le premier enregistrement entier d'un bloc et où se termine le dernier enregistrement du bloc .

Le point clé:

Dans les cas où le dernier enregistrement d'un bloc est incomplet, le fractionnement d'entrée comprend des informations d'emplacement pour le bloc suivant et le décalage d'octet des données nécessaires pour terminer l'enregistrement.

Regardez le diagramme ci-dessous.

entrez la description de l'image ici

Consultez cet article et la question SE associée: À propos du fractionnement de fichiers Hadoop / HDFS

Plus de détails peuvent être lus dans la documentation

Le cadre Map-Reduce repose sur le InputFormat du travail pour:

  1. Validez la spécification d'entrée du travail.
  2. Divisez le (s) fichier (s) d'entrée en InputSplits logiques, chacun d'eux étant ensuite affecté à un mappeur individuel.
  3. Chaque InputSplit est ensuite attribué à un mappeur individuel pour traitement. Split peut être un tuple . InputSplit[] getSplits(JobConf job,int numSplits) est l'API pour s'occuper de ces choses.

FileInputFormat , qui étend la méthode InputFormatimplémentée getSplits(). Jetez un œil aux éléments internes de cette méthode sur grepcode

Ravindra babu
la source
7

Je le vois comme suit: InputFormat est responsable de diviser les données en divisions logiques en tenant compte de la nature des données.
Rien ne l'empêche de le faire, bien que cela puisse ajouter une latence significative au travail - toute la logique et la lecture autour des limites de taille de division souhaitées se produiront dans le traqueur de travaux.
Le format d'entrée le plus simple prenant en charge les enregistrements est TextInputFormat. Cela fonctionne comme suit (pour autant que je sache d'après le code) - le format d'entrée crée des divisions par taille, quelles que soient les lignes, mais LineRecordReader toujours:
a) Sauter la première ligne du fractionnement (ou une partie de celle-ci), si ce n'est pas le cas la première division
b) Lire une ligne après la limite de la division à la fin (si des données sont disponibles, ce n'est donc pas la dernière division).

David Gruzman
la source
Skip first line in the split (or part of it), if it is not the first split- si le premier enregistrement du non-premier bloc est complet, vous ne savez pas comment cette logique fonctionnera.
Praveen Sripati
Pour autant que je vois le code - chaque division lit ce qu'elle a + ligne suivante. Donc, si le saut de ligne n'est pas sur la limite du bloc - c'est ok. Comment exactement géré le cas lorsque le saut de ligne est exactement sur la limite du bloc - doit être compris - je vais lire le code un peu plus
David Gruzman
3

D'après ce que j'ai compris, lorsque le FileSplitest initialisé pour le premier bloc, le constructeur par défaut est appelé. Par conséquent, les valeurs de début et de longueur sont initialement nulles. À la fin du traitement du premier bloc, si la dernière ligne est incomplète, la valeur de la longueur sera supérieure à la longueur de la séparation et il lira également la première ligne du bloc suivant. Pour cette raison, la valeur de début pour le premier bloc sera supérieure à zéro et dans cette condition, le LineRecordReadersautera la première ligne du deuxième bloc. (Voir source )

Dans le cas où la dernière ligne du premier bloc est complète, alors la valeur de longueur sera égale à la longueur du premier bloc et la valeur du début pour le deuxième bloc sera zéro. Dans ce cas, le LineRecordReaderne sautera pas la première ligne et lira le deuxième bloc depuis le début.

Logique?

aa8y
la source
2
Dans ce scénario, les mappeurs doivent communiquer entre eux et traiter les blocs en séquence lorsque la dernière ligne d'un bloc particulier n'est pas complète. Je ne sais pas si c'est ainsi que cela fonctionne.
Praveen Sripati
1

Du code source hadoop de LineRecordReader.java le constructeur: je trouve quelques commentaires:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

à partir de là, je crois que hadoop lira une ligne supplémentaire pour chaque division (à la fin de la division actuelle, lira la ligne suivante dans la prochaine division), et si ce n'est pas la première division, la première ligne sera jetée. afin qu'aucun enregistrement de ligne ne soit perdu et incomplet

Shenghai.Geng
la source
0

Les mappeurs n'ont pas à communiquer. Les blocs de fichiers sont en HDFS et le mappeur actuel (RecordReader) peut-il lire le bloc qui contient la partie restante de la ligne. Cela se produit dans les coulisses.

user3507308
la source