Il crée un dossier avec plusieurs fichiers, car chaque partition est enregistrée individuellement. Si vous avez besoin d'un seul fichier de sortie (toujours dans un dossier), vous pouvez repartition
(de préférence si les données en amont sont volumineuses, mais nécessitent une lecture aléatoire):
df
.repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
ou coalesce
:
df
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")
trame de données avant l'enregistrement:
Toutes les données seront écrites mydata.csv/part-00000
. Avant d'utiliser cette option, assurez-vous de bien comprendre ce qui se passe et quel est le coût du transfert de toutes les données vers un seul collaborateur . Si vous utilisez un système de fichiers distribué avec réplication, les données seront transférées plusieurs fois - d'abord récupérées vers un seul nœud de calcul, puis distribuées sur les nœuds de stockage.
Sinon, vous pouvez laisser votre code tel quel et utiliser des outils à usage général comme cat
ou HDFSgetmerge
pour simplement fusionner toutes les parties par la suite.
.coalesce(1)
il indique une FileNotFoundException sur le répertoire _temporary. C'est toujours un bogue dans Spark: issues.apache.org/jira/browse/SPARK-2984coalesce(1)
être très coûteux et généralement pas pratique.Si vous exécutez Spark avec HDFS, j'ai résolu le problème en écrivant normalement des fichiers csv et en exploitant HDFS pour effectuer la fusion. Je fais cela directement dans Spark (1.6):
Je ne me souviens pas où j'ai appris cette astuce, mais cela pourrait fonctionner pour vous.
la source
Je suis peut-être un peu en retard dans le jeu ici, mais utiliser
coalesce(1)
ourepartition(1)
peut fonctionner pour de petits ensembles de données, mais les grands ensembles de données seraient tous jetés dans une partition sur un nœud. Ceci est susceptible de générer des erreurs MOO ou, au mieux, de se traiter lentement.Je vous suggère fortement d'utiliser la
FileUtil.copyMerge()
fonction de l'API Hadoop. Cela fusionnera les sorties en un seul fichier.EDIT - Cela amène efficacement les données au pilote plutôt qu'à un nœud exécuteur.
Coalesce()
Ce serait bien si un seul exécuteur a plus de RAM à utiliser que le pilote.EDIT 2 :
copyMerge()
est en cours de suppression dans Hadoop 3.0. Consultez l'article suivant sur le débordement de pile pour plus d'informations sur la façon de travailler avec la dernière version: Comment faire CopyMerge dans Hadoop 3.0?la source
Si vous utilisez Databricks et que vous pouvez insérer toutes les données dans la RAM sur un seul worker (et donc utiliser
.coalesce(1)
), vous pouvez utiliser dbfs pour rechercher et déplacer le fichier CSV résultant:Si votre fichier ne rentre pas dans la RAM du worker, vous pouvez envisager la suggestion de chaotic3quilibrium d'utiliser FileUtils.copyMerge () . Je n'ai pas fait cela et je ne sais pas encore si c'est possible ou non, par exemple sur S3.
Cette réponse est basée sur les réponses précédentes à cette question ainsi que sur mes propres tests de l'extrait de code fourni. Je l'ai initialement posté sur Databricks et je le republie ici.
La meilleure documentation sur l'option rm de dbfs rm que j'ai trouvée est sur un forum Databricks .
la source
Une solution qui fonctionne pour S3 modifié depuis Minkymorgan.
Passez simplement le chemin du répertoire partitionné temporaire (avec un nom différent du chemin final) en tant que
srcPath
csv / txt final unique commedestPath
Spécifiez égalementdeleteSource
si vous souhaitez supprimer le répertoire d'origine.la source
L'
df.write()
API de spark créera plusieurs fichiers pièce à l'intérieur d'un chemin donné ... pour forcer l'écriture de Spark à un seul fichier pièce utiliserdf.coalesce(1).write.csv(...)
au lieu dedf.repartition(1).write.csv(...)
comme coalesce est une transformation étroite alors que la répartition est une transformation large voir Spark - repartition () vs coalesce ()créera un dossier dans un chemin de
part-0001-...-c000.csv
fichier donné avec une seule utilisation de fichierpour avoir un nom de fichier convivial
la source
df.toPandas().to_csv(path)
pour écrire un csv unique avec votre nom de fichier préférérepartitionner / fusionner sur 1 partition avant de sauvegarder (vous obtiendrez toujours un dossier mais il contiendrait un fichier partiel)
la source
vous pouvez utiliser
rdd.coalesce(1, true).saveAsTextFile(path)
il stockera les données sous forme de fichier unique dans path / part-00000
la source
J'ai résolu en utilisant l'approche ci-dessous (hdfs renommer le nom du fichier): -
Étape 1: - (Crate Data Frame et écrire sur HDFS)
Étape 2: - (Créer la configuration Hadoop)
Étape 3: - (Obtenez le chemin dans le chemin du dossier hdfs)
Étape 4: - (Obtenez les noms de fichiers Spark à partir du dossier hdfs)
setp5: - (créer une liste scala mutable pour enregistrer tous les noms de fichiers et l'ajouter à la liste)
Étape 6: - (filtre l'ordre des fichiers _SUCESS à partir de la liste des noms de fichiers scala)
étape 7: - (convertir la liste scala en chaîne et ajouter le nom de fichier souhaité à la chaîne de dossier hdfs, puis appliquer le changement de nom)
la source
J'utilise ceci en Python pour obtenir un seul fichier:
la source
Cette réponse développe la réponse acceptée, donne plus de contexte et fournit des extraits de code que vous pouvez exécuter dans Spark Shell sur votre ordinateur.
Plus de contexte sur la réponse acceptée
La réponse acceptée peut vous donner l'impression que l'exemple de code génère un seul
mydata.csv
fichier et ce n'est pas le cas. Démontrons:Voici ce qui est sorti:
NB
mydata.csv
est un dossier dans la réponse acceptée - ce n'est pas un fichier!Comment sortir un seul fichier avec un nom spécifique
Nous pouvons utiliser spark-daria pour écrire un seul
mydata.csv
fichier.Cela produira le fichier comme suit:
Chemins S3
Vous devrez passer les chemins s3a à
DariaWriters.writeSingleFile
pour utiliser cette méthode dans S3:Voir ici pour plus d'informations.
Éviter la copie
copyMerge a été supprimé de Hadoop 3. L'
DariaWriters.writeSingleFile
implémentation utilisefs.rename
, comme décrit ici . Spark 3 utilisait toujours Hadoop 2 , donc les implémentations de copyMerge fonctionneront en 2020. Je ne sais pas quand Spark passera à Hadoop 3, mais mieux vaut éviter toute approche copyMerge qui entraînerait la rupture de votre code lorsque Spark mettra à niveau Hadoop.Code source
Recherchez l'
DariaWriters
objet dans le code source de spark-daria si vous souhaitez inspecter l'implémentation.Implémentation PySpark
Il est plus facile d'écrire un seul fichier avec PySpark car vous pouvez convertir le DataFrame en un Pandas DataFrame qui est écrit en tant que fichier unique par défaut.
Limites
L'
DariaWriters.writeSingleFile
approche Scala et l'df.toPandas()
approche Python ne fonctionnent que pour les petits ensembles de données. Les énormes ensembles de données ne peuvent pas être écrits en tant que fichiers uniques. L'écriture de données sous forme de fichier unique n'est pas optimale du point de vue des performances, car les données ne peuvent pas être écrites en parallèle.la source
en utilisant Listbuffer, nous pouvons enregistrer les données dans un seul fichier:
la source
Il existe une autre façon d'utiliser Java
la source