Question intéressante, j'ai passé du temps à regarder le code pour les détails et voici mes pensées. Les fractionnements sont gérés par le client par InputFormat.getSplits
, donc un coup d'oeil à FileInputFormat donne les informations suivantes:
- Pour chaque fichier d'entrée, obtenez la longueur du fichier, la taille du bloc et calculez la taille de la division comme
max(minSize, min(maxSize, blockSize))
où maxSize
correspond à mapred.max.split.size
et minSize
est mapred.min.split.size
.
Divisez le fichier en différents FileSplit
s en fonction de la taille de division calculée ci-dessus. L'important ici est que chacun FileSplit
soit initialisé avec un start
paramètre correspondant au décalage dans le fichier d'entrée . Il n'y a toujours pas de traitement des lignes à ce stade. La partie pertinente du code ressemble à ceci:
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
Après cela, si vous regardez le LineRecordReader
qui est défini par le TextInputFormat
, c'est là que les lignes sont gérées:
- Lorsque vous initialisez votre,
LineRecordReader
il essaie d'instancier un LineReader
qui est une abstraction pour pouvoir lire les lignes FSDataInputStream
. Il y a 2 cas:
- S'il y a un
CompressionCodec
défini, alors ce codec est responsable de la gestion des limites. Probablement pas pertinent pour votre question.
S'il n'y a pas de codec cependant, c'est là que les choses sont intéressantes: si le start
de votre InputSplit
est différent de 0, alors vous reculez d'un caractère puis sautez la première ligne que vous rencontrez identifiée par \ n ou \ r \ n (Windows) ! Le retour en arrière est important car dans le cas où vos limites de ligne sont les mêmes que les limites de division, cela garantit que vous ne sautez pas la ligne valide. Voici le code pertinent:
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
Ainsi, puisque les fractionnements sont calculés dans le client, les mappeurs n'ont pas besoin de s'exécuter en séquence, chaque mappeur sait déjà s'il doit ignorer la première ligne ou non.
Donc, fondamentalement, si vous avez 2 lignes de chaque 100 Mo dans le même fichier, et pour simplifier, disons que la taille de division est de 64 Mo. Ensuite, lorsque les fractionnements d'entrée sont calculés, nous aurons le scénario suivant:
- Split 1 contenant le chemin et les hôtes de ce bloc. Initialisé au début 200-200 = 0 Mo, longueur 64 Mo.
- Split 2 initialisé au début 200-200 + 64 = 64 Mo, longueur 64 Mo.
- Split 3 initialisé au début 200-200 + 128 = 128 Mo, longueur 64 Mo.
- Split 4 initialisé au début 200-200 + 192 = 192 Mo, longueur 8 Mo.
- Le mappeur A traitera la division 1, le début est 0, donc ne sautez pas la première ligne et lisez une ligne complète qui dépasse la limite de 64 Mo et nécessite donc une lecture à distance.
- Le mappeur B traitera la division 2, le début est! = 0 donc sautez la première ligne après 64 Mo-1 octet, ce qui correspond à la fin de la ligne 1 à 100 Mo qui est toujours dans la division 2, nous avons 28 Mo de la ligne dans la division 2, donc à distance lire les 72 Mo restants.
- Le mappeur C traitera la division 3, le début est! = 0 donc sautez la première ligne après 128 Mo-1 octet, ce qui correspond à la fin de la ligne 2 à 200 Mo, qui est la fin du fichier, alors ne faites rien.
- Le mappeur D est le même que le mappeur C sauf qu'il recherche une nouvelle ligne après 192 Mo-1 octet.
LineReader.readLine
fonction, je ne pense pas que cela soit pertinent pour votre question mais peut ajouter plus de détails si nécessaire.\r\n, \n
représente la troncature d'enregistrement)?L' algorithme Map Reduce ne fonctionne pas sur les blocs physiques du fichier. Cela fonctionne sur les fractionnements d'entrée logiques. La répartition des entrées dépend de l'endroit où l'enregistrement a été écrit. Un enregistrement peut s'étendre sur deux mappeurs.
La façon dont HDFS a été configuré, il décompose les fichiers très volumineux en gros blocs (par exemple, mesurant 128 Mo) et stocke trois copies de ces blocs sur différents nœuds du cluster.
HDFS n'a aucune connaissance du contenu de ces fichiers. Un enregistrement peut avoir été commencé dans le bloc-a mais la fin de cet enregistrement peut être présente dans le bloc-b .
Pour résoudre ce problème, Hadoop utilise une représentation logique des données stockées dans des blocs de fichiers, appelées fractionnements d'entrée. Lorsqu'un client de travail MapReduce calcule les fractionnements d'entrée , il détermine où commence le premier enregistrement entier d'un bloc et où se termine le dernier enregistrement du bloc .
Le point clé:
Dans les cas où le dernier enregistrement d'un bloc est incomplet, le fractionnement d'entrée comprend des informations d'emplacement pour le bloc suivant et le décalage d'octet des données nécessaires pour terminer l'enregistrement.
Regardez le diagramme ci-dessous.
Consultez cet article et la question SE associée: À propos du fractionnement de fichiers Hadoop / HDFS
Plus de détails peuvent être lus dans la documentation
Le cadre Map-Reduce repose sur le InputFormat du travail pour:
InputSplit[] getSplits(JobConf job,int numSplits
) est l'API pour s'occuper de ces choses.FileInputFormat , qui étend la méthode
InputFormat
implémentéegetSplits
(). Jetez un œil aux éléments internes de cette méthode sur grepcodela source
Je le vois comme suit: InputFormat est responsable de diviser les données en divisions logiques en tenant compte de la nature des données.
Rien ne l'empêche de le faire, bien que cela puisse ajouter une latence significative au travail - toute la logique et la lecture autour des limites de taille de division souhaitées se produiront dans le traqueur de travaux.
Le format d'entrée le plus simple prenant en charge les enregistrements est TextInputFormat. Cela fonctionne comme suit (pour autant que je sache d'après le code) - le format d'entrée crée des divisions par taille, quelles que soient les lignes, mais LineRecordReader toujours:
a) Sauter la première ligne du fractionnement (ou une partie de celle-ci), si ce n'est pas le cas la première division
b) Lire une ligne après la limite de la division à la fin (si des données sont disponibles, ce n'est donc pas la dernière division).
la source
Skip first line in the split (or part of it), if it is not the first split
- si le premier enregistrement du non-premier bloc est complet, vous ne savez pas comment cette logique fonctionnera.D'après ce que j'ai compris, lorsque le
FileSplit
est initialisé pour le premier bloc, le constructeur par défaut est appelé. Par conséquent, les valeurs de début et de longueur sont initialement nulles. À la fin du traitement du premier bloc, si la dernière ligne est incomplète, la valeur de la longueur sera supérieure à la longueur de la séparation et il lira également la première ligne du bloc suivant. Pour cette raison, la valeur de début pour le premier bloc sera supérieure à zéro et dans cette condition, leLineRecordReader
sautera la première ligne du deuxième bloc. (Voir source )Dans le cas où la dernière ligne du premier bloc est complète, alors la valeur de longueur sera égale à la longueur du premier bloc et la valeur du début pour le deuxième bloc sera zéro. Dans ce cas, le
LineRecordReader
ne sautera pas la première ligne et lira le deuxième bloc depuis le début.Logique?
la source
Du code source hadoop de LineRecordReader.java le constructeur: je trouve quelques commentaires:
à partir de là, je crois que hadoop lira une ligne supplémentaire pour chaque division (à la fin de la division actuelle, lira la ligne suivante dans la prochaine division), et si ce n'est pas la première division, la première ligne sera jetée. afin qu'aucun enregistrement de ligne ne soit perdu et incomplet
la source
Les mappeurs n'ont pas à communiquer. Les blocs de fichiers sont en HDFS et le mappeur actuel (RecordReader) peut-il lire le bloc qui contient la partie restante de la ligne. Cela se produit dans les coulisses.
la source