Écrire un seul fichier CSV à l'aide de spark-csv

Réponses:

168

Il crée un dossier avec plusieurs fichiers, car chaque partition est enregistrée individuellement. Si vous avez besoin d'un seul fichier de sortie (toujours dans un dossier), vous pouvez repartition(de préférence si les données en amont sont volumineuses, mais nécessitent une lecture aléatoire):

df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

ou coalesce:

df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

trame de données avant l'enregistrement:

Toutes les données seront écrites mydata.csv/part-00000. Avant d'utiliser cette option, assurez-vous de bien comprendre ce qui se passe et quel est le coût du transfert de toutes les données vers un seul collaborateur . Si vous utilisez un système de fichiers distribué avec réplication, les données seront transférées plusieurs fois - d'abord récupérées vers un seul nœud de calcul, puis distribuées sur les nœuds de stockage.

Sinon, vous pouvez laisser votre code tel quel et utiliser des outils à usage général comme catou HDFSgetmerge pour simplement fusionner toutes les parties par la suite.

zéro323
la source
6
vous pouvez également utiliser coalesce: df.coalesce (1) .write.format ("com.databricks.spark.csv") .option ("header", "true") .save ("mydata.csv")
ravi
spark 1.6 lève une erreur lorsque nous le définissons, .coalesce(1)il indique une FileNotFoundException sur le répertoire _temporary. C'est toujours un bogue dans Spark: issues.apache.org/jira/browse/SPARK-2984
Harsha
@Harsha Improbable. Plutôt un simple résultat d' coalesce(1)être très coûteux et généralement pas pratique.
zero323 du
D'accord @ zero323, mais si vous avez une exigence particulière de consolidation en un seul fichier, cela devrait toujours être possible étant donné que vous disposez de suffisamment de ressources et de temps.
Harsha
2
@Harsha Je ne dis pas qu'il n'y en a pas. Si vous réglez correctement GC, cela devrait fonctionner correctement, mais c'est simplement une perte de temps et cela nuira probablement aux performances globales. Donc personnellement, je ne vois aucune raison de déranger, d'autant plus qu'il est trivialement simple de fusionner des fichiers en dehors de Spark sans se soucier du tout de l'utilisation de la mémoire.
zero323 le
36

Si vous exécutez Spark avec HDFS, j'ai résolu le problème en écrivant normalement des fichiers csv et en exploitant HDFS pour effectuer la fusion. Je fais cela directement dans Spark (1.6):

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}


val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

Je ne me souviens pas où j'ai appris cette astuce, mais cela pourrait fonctionner pour vous.

Minkymorgan
la source
Je ne l'ai pas essayé - et je pense que ce n'est peut-être pas simple.
Minkymorgan
1
Merci. J'ai ajouté une réponse qui fonctionne sur Databricks
Josiah Yoder
@Minkymorgan j'ai un problème similaire mais je ne suis pas en mesure de le faire correctement ..Pouvez-vous s'il vous plaît regarder cette question stackoverflow.com/questions/46812388
...
4
@SUDARSHAN Ma fonction ci-dessus fonctionne avec des données non compressées. Dans votre exemple, je pense que vous utilisez la compression gzip lorsque vous écrivez des fichiers - puis après - essayez de les fusionner, ce qui échoue. Cela ne fonctionnera pas, car vous ne pouvez pas fusionner des fichiers gzip ensemble. Gzip n'est pas un algorithme de compression fractionnable, donc certainement pas "fusionnable". Vous pouvez tester la compression «snappy» ou «bz2» - mais vous pensez que cela échouera également lors de la fusion. Le mieux est probablement de supprimer la compression, de fusionner les fichiers bruts, puis de les compresser à l'aide d'un codec divisible.
Minkymorgan
et que faire si je veux conserver l'en-tête? il duplique pour chaque partie de fichier
Normal
32

Je suis peut-être un peu en retard dans le jeu ici, mais utiliser coalesce(1)ou repartition(1)peut fonctionner pour de petits ensembles de données, mais les grands ensembles de données seraient tous jetés dans une partition sur un nœud. Ceci est susceptible de générer des erreurs MOO ou, au mieux, de se traiter lentement.

Je vous suggère fortement d'utiliser la FileUtil.copyMerge()fonction de l'API Hadoop. Cela fusionnera les sorties en un seul fichier.

EDIT - Cela amène efficacement les données au pilote plutôt qu'à un nœud exécuteur. Coalesce()Ce serait bien si un seul exécuteur a plus de RAM à utiliser que le pilote.

EDIT 2 : copyMerge()est en cours de suppression dans Hadoop 3.0. Consultez l'article suivant sur le débordement de pile pour plus d'informations sur la façon de travailler avec la dernière version: Comment faire CopyMerge dans Hadoop 3.0?

etspaceman
la source
Des réflexions sur la façon d'obtenir un csv avec une ligne d'en-tête de cette façon? Je ne voudrais pas que le fichier produise un en-tête, car cela intercalerait les en-têtes dans tout le fichier, un pour chaque partition.
nojo
Il y a une option que j'ai utilisée dans le passé documentée ici: markhneedham.com/blog/2014/11/30/…
etspaceman
@etspaceman Cool. Je n'ai toujours pas vraiment de bon moyen de le faire, malheureusement, car je dois pouvoir le faire en Java (ou Spark, mais d'une manière qui ne consomme pas beaucoup de mémoire et peut travailler avec de gros fichiers) . Je n'arrive toujours pas à croire qu'ils aient supprimé cet appel d'API ... c'est une utilisation très courante même si elle n'est pas exactement utilisée par d'autres applications de l'écosystème Hadoop.
woot
20

Si vous utilisez Databricks et que vous pouvez insérer toutes les données dans la RAM sur un seul worker (et donc utiliser .coalesce(1)), vous pouvez utiliser dbfs pour rechercher et déplacer le fichier CSV résultant:

val fileprefix= "/mnt/aws/path/file-prefix"

dataset
  .coalesce(1)       
  .write             
//.mode("overwrite") // I usually don't use this, but you may want to.
  .option("header", "true")
  .option("delimiter","\t")
  .csv(fileprefix+".tmp")

val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
     .filter(file=>file.name.endsWith(".csv"))(0).path

dbutils.fs.cp(partition_path,fileprefix+".tab")

dbutils.fs.rm(fileprefix+".tmp",recurse=true)

Si votre fichier ne rentre pas dans la RAM du worker, vous pouvez envisager la suggestion de chaotic3quilibrium d'utiliser FileUtils.copyMerge () . Je n'ai pas fait cela et je ne sais pas encore si c'est possible ou non, par exemple sur S3.

Cette réponse est basée sur les réponses précédentes à cette question ainsi que sur mes propres tests de l'extrait de code fourni. Je l'ai initialement posté sur Databricks et je le republie ici.

La meilleure documentation sur l'option rm de dbfs rm que j'ai trouvée est sur un forum Databricks .

Josiah Yoder
la source
3

Une solution qui fonctionne pour S3 modifié depuis Minkymorgan.

Passez simplement le chemin du répertoire partitionné temporaire (avec un nom différent du chemin final) en tant que srcPathcsv / txt final unique comme destPath Spécifiez également deleteSourcesi vous souhaitez supprimer le répertoire d'origine.

/**
* Merges multiple partitions of spark text file output into single file. 
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit =  {
  import org.apache.hadoop.fs.FileUtil
  import java.net.URI
  val config = spark.sparkContext.hadoopConfiguration
  val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
  FileUtil.copyMerge(
    fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
  )
}
John Zhu
la source
La mise en œuvre de copyMerge répertorie tous les fichiers et les itère, ce n'est pas sûr dans s3. si vous écrivez vos fichiers puis les listez - cela ne garantit pas qu'ils seront tous listés. voir [this | docs.aws.amazon.com/AmazonS3/latest/dev/…
LiranBo
3

L' df.write()API de spark créera plusieurs fichiers pièce à l'intérieur d'un chemin donné ... pour forcer l'écriture de Spark à un seul fichier pièce utiliser df.coalesce(1).write.csv(...)au lieu de df.repartition(1).write.csv(...)comme coalesce est une transformation étroite alors que la répartition est une transformation large voir Spark - repartition () vs coalesce ()

df.coalesce(1).write.csv(filepath,header=True) 

créera un dossier dans un chemin de part-0001-...-c000.csvfichier donné avec une seule utilisation de fichier

cat filepath/part-0001-...-c000.csv > filename_you_want.csv 

pour avoir un nom de fichier convivial

pprasad009
la source
sinon, si la trame de données n'est pas trop grande (~ Go ou peut tenir dans la mémoire du pilote), vous pouvez également l'utiliser df.toPandas().to_csv(path)pour écrire un csv unique avec votre nom de fichier préféré
pprasad009
2
Ugh, tellement frustrant de voir comment cela ne peut être fait qu'en se convertissant en pandas. Est-il difficile d'écrire simplement un fichier sans UUID?
ijoseph le
2

repartitionner / fusionner sur 1 partition avant de sauvegarder (vous obtiendrez toujours un dossier mais il contiendrait un fichier partiel)

Arnon Rotem-Gal-Oz
la source
2

vous pouvez utiliser rdd.coalesce(1, true).saveAsTextFile(path)

il stockera les données sous forme de fichier unique dans path / part-00000

Gourav
la source
1
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.{DataFrame,SaveMode,SparkSession}
import org.apache.spark.sql.functions._

J'ai résolu en utilisant l'approche ci-dessous (hdfs renommer le nom du fichier): -

Étape 1: - (Crate Data Frame et écrire sur HDFS)

df.coalesce(1).write.format("csv").option("header", "false").mode(SaveMode.Overwrite).save("/hdfsfolder/blah/")

Étape 2: - (Créer la configuration Hadoop)

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)

Étape 3: - (Obtenez le chemin dans le chemin du dossier hdfs)

val pathFiles = new Path("/hdfsfolder/blah/")

Étape 4: - (Obtenez les noms de fichiers Spark à partir du dossier hdfs)

val fileNames = hdfs.listFiles(pathFiles, false)
println(fileNames)

setp5: - (créer une liste scala mutable pour enregistrer tous les noms de fichiers et l'ajouter à la liste)

    var fileNamesList = scala.collection.mutable.MutableList[String]()
    while (fileNames.hasNext) {
      fileNamesList += fileNames.next().getPath.getName
    }
    println(fileNamesList)

Étape 6: - (filtre l'ordre des fichiers _SUCESS à partir de la liste des noms de fichiers scala)

    // get files name which are not _SUCCESS
    val partFileName = fileNamesList.filterNot(filenames => filenames == "_SUCCESS")

étape 7: - (convertir la liste scala en chaîne et ajouter le nom de fichier souhaité à la chaîne de dossier hdfs, puis appliquer le changement de nom)

val partFileSourcePath = new Path("/yourhdfsfolder/"+ partFileName.mkString(""))
    val desiredCsvTargetPath = new Path(/yourhdfsfolder/+ "op_"+ ".csv")
    hdfs.rename(partFileSourcePath , desiredCsvTargetPath)
sri hari kali charan Tummala
la source
1

J'utilise ceci en Python pour obtenir un seul fichier:

df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)
Kees C. Bakker
la source
1

Cette réponse développe la réponse acceptée, donne plus de contexte et fournit des extraits de code que vous pouvez exécuter dans Spark Shell sur votre ordinateur.

Plus de contexte sur la réponse acceptée

La réponse acceptée peut vous donner l'impression que l'exemple de code génère un seul mydata.csvfichier et ce n'est pas le cas. Démontrons:

val df = Seq("one", "two", "three").toDF("num")
df
  .repartition(1)
  .write.csv(sys.env("HOME")+ "/Documents/tmp/mydata.csv")

Voici ce qui est sorti:

Documents/
  tmp/
    mydata.csv/
      _SUCCESS
      part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv

NB mydata.csvest un dossier dans la réponse acceptée - ce n'est pas un fichier!

Comment sortir un seul fichier avec un nom spécifique

Nous pouvons utiliser spark-daria pour écrire un seul mydata.csvfichier.

import com.github.mrpowers.spark.daria.sql.DariaWriters
DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = sys.env("HOME") + "/Documents/better/staging",
    filename = sys.env("HOME") + "/Documents/better/mydata.csv"
)

Cela produira le fichier comme suit:

Documents/
  better/
    mydata.csv

Chemins S3

Vous devrez passer les chemins s3a à DariaWriters.writeSingleFilepour utiliser cette méthode dans S3:

DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = "s3a://bucket/data/src",
    filename = "s3a://bucket/data/dest/my_cool_file.csv"
)

Voir ici pour plus d'informations.

Éviter la copie

copyMerge a été supprimé de Hadoop 3. L' DariaWriters.writeSingleFileimplémentation utilise fs.rename, comme décrit ici . Spark 3 utilisait toujours Hadoop 2 , donc les implémentations de copyMerge fonctionneront en 2020. Je ne sais pas quand Spark passera à Hadoop 3, mais mieux vaut éviter toute approche copyMerge qui entraînerait la rupture de votre code lorsque Spark mettra à niveau Hadoop.

Code source

Recherchez l' DariaWritersobjet dans le code source de spark-daria si vous souhaitez inspecter l'implémentation.

Implémentation PySpark

Il est plus facile d'écrire un seul fichier avec PySpark car vous pouvez convertir le DataFrame en un Pandas DataFrame qui est écrit en tant que fichier unique par défaut.

from pathlib import Path
home = str(Path.home())
data = [
    ("jellyfish", "JALYF"),
    ("li", "L"),
    ("luisa", "LAS"),
    (None, None)
]
df = spark.createDataFrame(data, ["word", "expected"])
df.toPandas().to_csv(home + "/Documents/tmp/mydata-from-pyspark.csv", sep=',', header=True, index=False)

Limites

L' DariaWriters.writeSingleFileapproche Scala et l' df.toPandas()approche Python ne fonctionnent que pour les petits ensembles de données. Les énormes ensembles de données ne peuvent pas être écrits en tant que fichiers uniques. L'écriture de données sous forme de fichier unique n'est pas optimale du point de vue des performances, car les données ne peuvent pas être écrites en parallèle.

Pouvoirs
la source
0

en utilisant Listbuffer, nous pouvons enregistrer les données dans un seul fichier:

import java.io.FileWriter
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
    val text = spark.read.textFile("filepath")
    var data = ListBuffer[String]()
    for(line:String <- text.collect()){
      data += line
    }
    val writer = new FileWriter("filepath")
    data.foreach(line => writer.write(line.toString+"\n"))
    writer.close()
siddhu salvi
la source
-2

Il existe une autre façon d'utiliser Java

import java.io._

def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) 
  {
     val p = new java.io.PrintWriter(f);  
     try { op(p) } 
     finally { p.close() }
  } 

printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}
Sergio Alyoshkin
la source
Le nom 'vrai' n'est pas défini
Arron