Je veux lire un tas de fichiers texte à partir d'un emplacement hdfs et effectuer un mappage dessus dans une itération à l'aide de spark.
JavaRDD<String> records = ctx.textFile(args[1], 1);
est capable de lire un seul fichier à la fois.
Je veux lire plus d'un fichier et les traiter comme un seul RDD. Comment?
apache-spark
user3705662
la source
la source
Path
options s'appliquent.sc.wholeTextFiles
est pratique pour les données qui ne sont pas délimitées par des lignessc.textFile(multipleCommaSeparatedDirs,320)
cela conduit à des19430
tâches totales au lieu de320
... il se comporte commeunion
ce qui conduit également à un nombre insensé de tâches à partir d'un très faible parallélismewholeTextFiles
. Quel est votre cas d'utilisation? Je peux penser à une solution de contournement à condition d'utiliser le même nombre de partitions que des fichiers ...Utilisez
union
comme suit:Ensuite,
bigRdd
c'est le RDD avec tous les fichiers.la source
Vous pouvez utiliser un seul appel textFile pour lire plusieurs fichiers. Scala:
la source
sc.textFile(files.mkString(","))
Vous pouvez utiliser ceci
Vous pouvez d'abord obtenir un tampon / une liste de chemins S3:
Passez maintenant cet objet List au morceau de code suivant, notez: sc est un objet de SQLContext
Maintenant vous avez un final unifié RDD ie df
Facultatif, et vous pouvez également le repartitionner dans un seul BigRDD
Le repartitionnement fonctionne toujours: D
la source
Dans PySpark, j'ai trouvé un moyen utile supplémentaire pour analyser les fichiers. Il existe peut-être un équivalent dans Scala, mais je ne suis pas assez à l'aise pour proposer une traduction fonctionnelle. Il s'agit en fait d'un appel textFile avec l'ajout d'étiquettes (dans l'exemple ci-dessous, la clé = nom de fichier, valeur = 1 ligne du fichier).
TextFile "étiqueté"
contribution:
sortie: tableau avec chaque entrée contenant un tuple utilisant filename-as-key et avec valeur = chaque ligne de fichier. (Techniquement, en utilisant cette méthode, vous pouvez également utiliser une clé différente en plus du nom de chemin de fichier réel - peut-être une représentation de hachage à enregistrer sur la mémoire). c'est à dire.
Vous pouvez également recombiner soit sous forme de liste de lignes:
Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
Ou recombinez des fichiers entiers en chaînes uniques (dans cet exemple, le résultat est le même que celui que vous obtenez de wholeTextFiles, mais avec la chaîne "file:" supprimée du chemin de fichier.):
Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()
la source
Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)
j'ai eu l'erreur ieTypeError: 'PipelinedRDD' object is not iterable
. Je crois comprendre que cette ligne crée un RDD qui est immuable, alors je me demandais comment vous avez pu l'ajouter à une autre variable?vous pouvez utiliser
ici, vous obtiendrez le chemin de votre fichier et le contenu de ce fichier. afin que vous puissiez effectuer n'importe quelle action d'un fichier entier à un moment qui économise la surcharge
la source
Toutes les réponses sont correctes avec
sc.textFile
Je me demandais juste pourquoi pas
wholeTextFiles
Par exemple, dans ce cas ...une limitation est que nous devons charger de petits fichiers sinon les performances seront mauvaises et peuvent conduire à un MOO.
Remarque :
Autre référence à visiter
la source
sc.wholeTextFiles(folder).flatMap...
Il existe une solution simple et propre. Utilisez la méthode wholeTextFiles (). Cela prendra un répertoire et forme une paire clé / valeur. Le RDD retourné sera une paire RDD. Retrouvez ci-dessous la description tirée de la documentation Spark :
la source
ESSAYEZ CETTE Interface utilisée pour écrire un DataFrame sur des systèmes de stockage externes (par exemple, des systèmes de fichiers, des magasins clé-valeur, etc.). Utilisez DataFrame.write () pour y accéder.
Nouveau dans la version 1.4.
csv (chemin, mode = None, compression = None, sep = None, quote = None, escape = None, header = None, nullValue = None, escapeQuotes = None, quoteAll = None, dateFormat = None, timestampFormat = None) Enregistre le contenu du DataFrame au format CSV au chemin spécifié.
Paramètres: chemin - le chemin dans n'importe quel mode de système de fichiers pris en charge par Hadoop - spécifie le comportement de l'opération de sauvegarde lorsque des données existent déjà.
append: ajoute le contenu de ce DataFrame aux données existantes. écraser: écraser les données existantes. ignore: ignorez en silence cette opération si des données existent déjà. erreur (cas par défaut): lance une exception si des données existent déjà. compression - codec de compression à utiliser lors de l'enregistrement dans un fichier. Il peut s'agir de l'un des noms abrégés connus insensibles à la casse (none, bzip2, gzip, lz4, snappy et deflate). sep - définit le caractère unique comme séparateur pour chaque champ et valeur. Si Aucun est défini, il utilise la valeur par défaut,,. quote - définit le caractère unique utilisé pour échapper les valeurs entre guillemets où le séparateur peut faire partie de la valeur. Si Aucun est défini, il utilise la valeur par défaut, ". Si vous souhaitez désactiver les guillemets, vous devez définir une chaîne vide. Escape - définit le caractère unique utilisé pour échapper les guillemets dans une valeur déjà citée. Si Aucun est défini , il utilise la valeur par défaut, \ escapeQuotes - Un indicateur indiquant si les valeurs contenant des guillemets doivent toujours être placées entre guillemets. Si None est défini, il utilise la valeur par défaut true, en échappant à toutes les valeurs contenant un caractère guillemet. quoteAll - Un indicateur indiquant si toutes les valeurs doivent toujours être placées entre guillemets. Si None est défini, il utilise la valeur par défaut false, en échappant uniquement les valeurs contenant un caractère guillemet. header - écrit les noms des colonnes comme première ligne. Si None est défini, il utilise la valeur par défaut, false. nullValue - définit la représentation sous forme de chaîne d'une valeur nulle. Si None est défini, il utilise la valeur par défaut, une chaîne vide. dateFormat - définit la chaîne qui indique un format de date. Les formats de date personnalisés suivent les formats de java.text.SimpleDateFormat. Cela s'applique au type de date. Si Aucun est défini, il utilise la valeur par défaut, aaaa-MM-jj. timestampFormat - définit la chaîne qui indique un format d'horodatage. Les formats de date personnalisés suivent les formats de java.text.SimpleDateFormat. Cela s'applique au type d'horodatage. Si Aucun est défini, il utilise la valeur par défaut, aaaa-MM-jj'T'HH: mm: ss.SSSZZ.
la source
la source