J'ai une application Spark Streaming qui produit un ensemble de données pour chaque minute. J'ai besoin de sauvegarder / écraser les résultats des données traitées.
Lorsque j'ai essayé d'écraser l'ensemble de données org.apache.hadoop.mapred.FileAlreadyExistsException arrête l'exécution.
J'ai défini la propriété Spark set("spark.files.overwrite","true")
, mais il n'y a pas de chance.
Comment écraser ou prédéfinir les fichiers de Spark?
apache-spark
Vijay Innamuri
la source
la source
set("spark.files.overwrite","true")
fonctionne uniquement pour les fichiers ajoutésspark.addFile()
Réponses:
MISE À JOUR: Suggérez d'utiliser
Dataframes
, plus quelque chose comme... .write.mode(SaveMode.Overwrite) ...
.Proximité pratique:
Pour les anciennes versions, essayez
Dans la version 1.1.0, vous pouvez définir les paramètres de configuration à l'aide du script spark-submit avec l'indicateur --conf.
AVERTISSEMENT (anciennes versions): Selon @piggybox, il y a un bogue dans Spark où il n'écrasera que les fichiers dont il a besoin pour écrire ses
part-
fichiers, tous les autres fichiers ne seront pas supprimés.la source
Spark 1.4
:df.write.mode(SaveMode.Overwrite).parquet(path)
df.write.mode(mode: String).parquet(path)
mode Où: La chaîne peut être: "écraser", "ajouter", "ignorer", "erreur".depuis
df.save(path, source, mode)
est obsolète, ( http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame )utiliser
df.write.format(source).mode("overwrite").save(path)
où df.write est DataFrameWriter
'source' peut être ("com.databricks.spark.avro" | "parquet" | "json")
la source
source
peut aussi êtrecsv
La documentation du paramètre
spark.files.overwrite
dit ceci: "S'il faut écraser les fichiers ajoutésSparkContext.addFile()
lorsque le fichier cible existe et que son contenu ne correspond pas à celui de la source." Cela n'a donc aucun effet sur la méthode saveAsTextFiles.Vous pouvez le faire avant d'enregistrer le fichier:
Aas expliqué ici: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696. html
la source
À partir de la documentation pyspark.sql.DataFrame.save (actuellement à 1.3.1), vous pouvez spécifier
mode='overwrite'
lors de l'enregistrement d'un DataFrame:J'ai vérifié que cela supprimera même les fichiers de partition restants. Donc, si vous aviez dit 10 partitions / fichiers à l'origine, mais que vous écrasiez ensuite le dossier avec un DataFrame qui n'avait que 6 partitions, le dossier résultant aura les 6 partitions / fichiers.
Consultez la documentation Spark SQL pour plus d'informations sur les options de mode.
la source
spark.hadoop.validateOutputSpecs
fonctionnera sur toutes les API Spark.spark.hadoop.validateOutputSpecs
n'a pas fonctionné pour moi sur 1.3, mais cela fonctionne.save(... , mode=
route, vous pouvez écraser un ensemble de fichiers, en ajouter un autre, etc. dans le même contexte Spark. Nespark.hadoop.validateOutputSpecs
vous limiteriez-vous pas à un seul mode par contexte?df.write.mode('overwrite').parquet("/output/folder/path")
fonctionne si vous souhaitez écraser un fichier parquet en utilisant python. C'est dans l'étincelle 1.6.2. L'API peut être différente dans les versions ultérieuresla source
la source
df.write.mode(SaveMode.Overwrite)
Cette version surchargée de la fonction de sauvegarde fonctionne pour moi:
yourDF.save (chemin de sortie, org.apache.spark.sql.SaveMode.valueOf ("Écraser"))
L'exemple ci-dessus écraserait un dossier existant. Le savemode peut également prendre ces paramètres ( https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html ):
Ajouter : le mode Ajouter signifie que lors de l'enregistrement d'un DataFrame dans une source de données, si des données / une table existent déjà, le contenu du DataFrame est censé être ajouté aux données existantes.
ErrorIfExists : le mode ErrorIfExists signifie que lors de l'enregistrement d'un DataFrame dans une source de données, si des données existent déjà, une exception devrait être levée.
Ignorer : le mode Ignorer signifie que lors de l'enregistrement d'un DataFrame dans une source de données, si des données existent déjà, l'opération d'enregistrement est censée ne pas enregistrer le contenu du DataFrame et ne pas modifier les données existantes.
la source
Si vous souhaitez utiliser votre propre format de sortie personnalisé, vous pourrez également obtenir le comportement souhaité avec RDD.
Jetez un œil aux classes suivantes: FileOutputFormat , FileOutputCommitter
Dans le format de sortie de fichier, vous avez une méthode nommée checkOutputSpecs, qui vérifie si le répertoire de sortie existe. Dans FileOutputCommitter, vous avez le commitJob qui transfère généralement les données du répertoire temporaire à son emplacement final.
Je n'ai pas encore pu le vérifier (je le ferais, dès que j'ai quelques minutes libres) mais théoriquement: si j'étends FileOutputFormat et remplace checkOutputSpecs par une méthode qui ne lance pas d'exception sur le répertoire existe déjà, et ajuste le commitJob de mon committer de sortie personnalisé pour exécuter la logique que je souhaite (par exemple, remplacer certains fichiers, en ajouter d'autres) afin que je puisse également obtenir le comportement souhaité avec les RDD.
Le format de sortie est passé à: saveAsNewAPIHadoopFile (qui est la méthode saveAsTextFile également appelée pour enregistrer les fichiers). Et le committer de sortie est configuré au niveau de l'application.
la source