Comment écraser le répertoire de sortie dans Spark

108

J'ai une application Spark Streaming qui produit un ensemble de données pour chaque minute. J'ai besoin de sauvegarder / écraser les résultats des données traitées.

Lorsque j'ai essayé d'écraser l'ensemble de données org.apache.hadoop.mapred.FileAlreadyExistsException arrête l'exécution.

J'ai défini la propriété Spark set("spark.files.overwrite","true"), mais il n'y a pas de chance.

Comment écraser ou prédéfinir les fichiers de Spark?

Vijay Innamuri
la source
1
Ouais ça craint n'est-ce pas, je considère que c'est une régression vers 0.9.0. Veuillez accepter ma réponse :)
samthebest
set("spark.files.overwrite","true")fonctionne uniquement pour les fichiers ajoutés spark.addFile()
viat

Réponses:

107

MISE À JOUR: Suggérez d'utiliser Dataframes, plus quelque chose comme ... .write.mode(SaveMode.Overwrite) ....

Proximité pratique:

implicit class PimpedStringRDD(rdd: RDD[String]) {
    def write(p: String)(implicit ss: SparkSession): Unit = {
      import ss.implicits._
      rdd.toDF().as[String].write.mode(SaveMode.Overwrite).text(p)
    }
  }

Pour les anciennes versions, essayez

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

Dans la version 1.1.0, vous pouvez définir les paramètres de configuration à l'aide du script spark-submit avec l'indicateur --conf.

AVERTISSEMENT (anciennes versions): Selon @piggybox, il y a un bogue dans Spark où il n'écrasera que les fichiers dont il a besoin pour écrire ses part-fichiers, tous les autres fichiers ne seront pas supprimés.

samthebest
la source
30
Pour Spark 1.4:df.write.mode(SaveMode.Overwrite).parquet(path)
Ha Pham
Pour Spark SQL, vous avez des options pour définir le SaveMode pour Core Spark, vous n'avez rien de tel. J'aimerais vraiment utiliser ce type de fonctionnalité pour saveAsTextFile et d'autres transformations
Murtaza Kanchwala
3
Un problème caché: comparé à la solution de @ pzecevic pour effacer tout le dossier via HDFS, dans cette approche, Spark écrasera uniquement les fichiers pièce avec le même nom de fichier dans le dossier de sortie. Cela fonctionne la plupart du temps, mais s'il y a autre chose, comme des fichiers de pièce supplémentaires d'un autre travail Spark / Hadoop dans le dossier, cela n'écrasera pas ces fichiers.
tirelire
6
Vous pouvez également utiliser le df.write.mode(mode: String).parquet(path)mode Où: La chaîne peut être: "écraser", "ajouter", "ignorer", "erreur".
seigle
1
@avocado Yup pense que oui, les API Spark ne font que s'aggraver à chaque version: P
samthebest
27

La documentation du paramètre spark.files.overwritedit ceci: "S'il faut écraser les fichiers ajoutés SparkContext.addFile()lorsque le fichier cible existe et que son contenu ne correspond pas à celui de la source." Cela n'a donc aucun effet sur la méthode saveAsTextFiles.

Vous pouvez le faire avant d'enregistrer le fichier:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Aas expliqué ici: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html

pzecevic
la source
29
qu'en est-il pour pyspark?
javadba
La prochaine réponse à utiliser 'write.mode (SaveMode.Overwrite)' est la voie à suivre
YaOg
hdfs peut supprimer les nouveaux fichiers au fur et à mesure qu'ils arrivent car il supprime toujours les anciens.
Jake
25

À partir de la documentation pyspark.sql.DataFrame.save (actuellement à 1.3.1), vous pouvez spécifier mode='overwrite'lors de l'enregistrement d'un DataFrame:

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

J'ai vérifié que cela supprimera même les fichiers de partition restants. Donc, si vous aviez dit 10 partitions / fichiers à l'origine, mais que vous écrasiez ensuite le dossier avec un DataFrame qui n'avait que 6 partitions, le dossier résultant aura les 6 partitions / fichiers.

Consultez la documentation Spark SQL pour plus d'informations sur les options de mode.

dnlbrky
la source
2
Vrai et utile, merci, mais une solution spécifique à DataFrame spark.hadoop.validateOutputSpecsfonctionnera sur toutes les API Spark.
samthebest
Pour une raison quelconque, cela spark.hadoop.validateOutputSpecsn'a pas fonctionné pour moi sur 1.3, mais cela fonctionne.
Eric Walker
1
@samthebest Avec la save(... , mode=route, vous pouvez écraser un ensemble de fichiers, en ajouter un autre, etc. dans le même contexte Spark. Ne spark.hadoop.validateOutputSpecsvous limiteriez-vous pas à un seul mode par contexte?
dnlbrky
1
@dnlbrky Le PO n'a pas demandé l'ajout. Comme je l'ai dit, vrai, utile, mais inutile. Si le PO demandait «comment ajouter», alors toute une gamme de réponses pourrait être donnée. Mais n'entrons pas là-dedans. Je vous conseille également d'envisager d'utiliser la version Scala de DataFrames car elle offre une sécurité de type et plus de vérification - par exemple, si vous aviez une faute de frappe dans "écraser", vous ne le sauriez pas tant que ce DAG n'a pas été évalué - ce qui dans un travail Big Data pourrait être 2 heures plus tard !! Si vous utilisez la version Scala, le compilateur vérifiera tout à l'avance! Assez cool et très important pour le Big Data.
samthebest
15

df.write.mode('overwrite').parquet("/output/folder/path")fonctionne si vous souhaitez écraser un fichier parquet en utilisant python. C'est dans l'étincelle 1.6.2. L'API peut être différente dans les versions ultérieures

akn
la source
Oui, cela fonctionne très bien pour mes besoins (Databricks)
Nick.McDermaid
4
  val jobName = "WordCount";
  //overwrite the output directory in spark  set("spark.hadoop.validateOutputSpecs", "false")
  val conf = new 
  SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
  val sc = new SparkContext(conf)
vaquar khan
la source
Uniquement pour Spark 1, dans la dernière version, utilisezdf.write.mode(SaveMode.Overwrite)
ChikuMiku
3

Cette version surchargée de la fonction de sauvegarde fonctionne pour moi:

yourDF.save (chemin de sortie, org.apache.spark.sql.SaveMode.valueOf ("Écraser"))

L'exemple ci-dessus écraserait un dossier existant. Le savemode peut également prendre ces paramètres ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):

Ajouter : le mode Ajouter signifie que lors de l'enregistrement d'un DataFrame dans une source de données, si des données / une table existent déjà, le contenu du DataFrame est censé être ajouté aux données existantes.

ErrorIfExists : le mode ErrorIfExists signifie que lors de l'enregistrement d'un DataFrame dans une source de données, si des données existent déjà, une exception devrait être levée.

Ignorer : le mode Ignorer signifie que lors de l'enregistrement d'un DataFrame dans une source de données, si des données existent déjà, l'opération d'enregistrement est censée ne pas enregistrer le contenu du DataFrame et ne pas modifier les données existantes.

Shay
la source
1

Si vous souhaitez utiliser votre propre format de sortie personnalisé, vous pourrez également obtenir le comportement souhaité avec RDD.

Jetez un œil aux classes suivantes: FileOutputFormat , FileOutputCommitter

Dans le format de sortie de fichier, vous avez une méthode nommée checkOutputSpecs, qui vérifie si le répertoire de sortie existe. Dans FileOutputCommitter, vous avez le commitJob qui transfère généralement les données du répertoire temporaire à son emplacement final.

Je n'ai pas encore pu le vérifier (je le ferais, dès que j'ai quelques minutes libres) mais théoriquement: si j'étends FileOutputFormat et remplace checkOutputSpecs par une méthode qui ne lance pas d'exception sur le répertoire existe déjà, et ajuste le commitJob de mon committer de sortie personnalisé pour exécuter la logique que je souhaite (par exemple, remplacer certains fichiers, en ajouter d'autres) afin que je puisse également obtenir le comportement souhaité avec les RDD.

Le format de sortie est passé à: saveAsNewAPIHadoopFile (qui est la méthode saveAsTextFile également appelée pour enregistrer les fichiers). Et le committer de sortie est configuré au niveau de l'application.

Michael Kopaniov
la source
J'éviterais d'aller près de sous-classer FileOutputCommitter si vous pouvez l'aider: c'est un peu de code effrayant. Hadoop 3.0 ajoute un point de plugin où FileOutputFormat peut prendre différentes implémentations d'une superclasse refactorisée (PathOutputCommitter). Le S3 de Netflix écrira sur place dans un arbre partitionné, ne faisant que la résolution des conflits (échec, suppression, ajout) lors de la validation du travail, et uniquement dans les partitions mises à jour
niveau