Je voudrais lire un CSV dans Spark et le convertir en DataFrame et le stocker en HDFS avec df.registerTempTable("table_name")
J'ai essayé:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Erreur que j'ai obtenue:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Quelle est la bonne commande pour charger un fichier CSV en tant que DataFrame dans Apache Spark?
Réponses:
spark-csv fait partie des fonctionnalités principales de Spark et ne nécessite pas de bibliothèque distincte. Donc tu pourrais juste faire par exemple
Dans scala, (cela fonctionne pour toute mention de délimiteur de formatage "," pour csv, "\ t" pour tsv, etc.)
val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")
la source
Analyser CSV et charger en tant que DataFrame / DataSet avec Spark 2.x
Tout d'abord, initialisez l'
SparkSession
objet par défaut, il sera disponible dans les shells commespark
1. Faites-le de manière programmatique
Mise à jour: Ajout de toutes les options d'ici au cas où le lien serait rompu à l'avenir
2. Vous pouvez également utiliser cette méthode SQL
Dépendances :
Version Spark <2.0
Dépendances:
la source
spark-core_2.11
etspark-sql_2.11
de la2.0.1
version est très bien. Si possible, ajoutez le message d'erreur.spark.read.format("csv").option("delimiter ", "|") ...
programmatic way
est de laisser le.format("csv")
et de le remplacer.load(...
par.csv(...
. Laoption
méthode appartient à la classe DataFrameReader telle qu'elle est renvoyée par laread
méthode, où les méthodesload
etcsv
renvoient un dataframe et ne peuvent donc pas avoir d'options marquées après leur appel. Cette réponse est assez complète mais vous devriez créer un lien vers la documentation afin que les gens puissent voir toutes les autres options CSV disponibles spark.apache.org/docs/latest/api/scala/… *): org.apache.spark.sql.DataFrameC'est pour qui Hadoop est 2.6 et Spark est 1.6 et sans package "databricks".
la source
Avec Spark 2.0, voici comment vous pouvez lire CSV
la source
spark.read.csv(path)
etspark.read.format("csv").load(path)
?En Java 1.8, cet extrait de code fonctionne parfaitement pour lire les fichiers CSV
POM.xml
Java
la source
L'analyse d'un fichier CSV pose de nombreux défis, elle continue de s'additionner si la taille du fichier est plus grande, s'il y a des caractères non anglais / d'échappement / séparateur / autres dans les valeurs de colonne, cela pourrait provoquer des erreurs d'analyse.
La magie réside alors dans les options utilisées. Ceux qui ont fonctionné pour moi et espèrent couvrir la plupart des cas extrêmes sont dans le code ci-dessous:
J'espère que cela pourra aider. Pour plus d'informations, reportez-vous à: Utilisation de PySpark 2 pour lire des fichiers CSV contenant du code source HTML
Remarque: Le code ci-dessus provient de l'API Spark 2, où l'API de lecture de fichier CSV est fournie avec des packages intégrés de Spark installable.
Remarque: PySpark est un wrapper Python pour Spark et partage la même API que Scala / Java.
la source
L'exemple de Penny's Spark 2 est la façon de le faire dans spark2. Il y a une autre astuce: faites générer cet en-tête pour vous en effectuant une analyse initiale des données, en définissant l'option
inferSchema
surtrue
Voici donc, en supposant qu'il
spark
s'agit d'une session Spark que vous avez configurée, l'opération à charger dans le fichier d'index CSV de toutes les images Landsat hébergées par Amazon sur S3.La mauvaise nouvelle est la suivante: cela déclenche une analyse du fichier; pour quelque chose de gros comme ce fichier CSV zippé de plus de 20 Mo, cela peut prendre 30 secondes sur une connexion longue distance. Gardez cela à l'esprit: il vaut mieux coder manuellement le schéma une fois que vous l'avez reçu.
(extrait de code Apache Software License 2.0 sous licence pour éviter toute ambiguïté; ce que j'ai fait comme test de démonstration / d'intégration de l'intégration S3)
la source
Dans le cas où vous construisez un jar avec scala 2.11 et Apache 2.0 ou supérieur.
Il n'est pas nécessaire de créer un objet
sqlContext
ousparkContext
. UnSparkSession
seul objet suffit à tous les besoins.Voici mycode qui fonctionne bien:
Si vous utilisez un cluster, passez simplement
.master("local")
à.master("yarn")
lors de la définition de l'sparkBuilder
objetLe Spark Doc couvre ceci: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
la source
Ajoutez les dépendances Spark suivantes au fichier POM:
// Configuration de Spark:
val spark = SparkSession.builder (). master ("local"). appName ("Exemple d'application"). getOrCreate ()
// Lire le fichier csv:
val df = spark.read.option ("header", "true"). csv ("FILE_PATH")
// Afficher la sortie
df.show ()
la source
Pour lire à partir du chemin relatif sur le système, utilisez la méthode System.getProperty pour obtenir le répertoire actuel et utilise en outre pour charger le fichier à l'aide du chemin relatif.
étincelle: 2.4.4 échelle: 2.11.12
la source
Avec Spark 2.4+, si vous souhaitez charger un csv à partir d'un répertoire local, vous pouvez utiliser 2 sessions et le charger dans la ruche. La première session doit être créée avec la configuration master () comme "local [*]" et la deuxième session avec "yarn" et Hive activés.
Celui ci-dessous a fonctionné pour moi.
Lorsqu'il a été exécuté avec,
spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar
il s'est bien passé et a créé la table dans la ruche.la source
Le format de fichier par défaut est Parquet avec spark.read .. et le fichier de lecture csv c'est pourquoi vous obtenez l'exception. Spécifiez le format csv avec l'API que vous essayez d'utiliser
la source
Essayez ceci si vous utilisez Spark 2.0+
Remarque: - ce travail pour tout fichier délimité. Utilisez simplement l'option ("delimiter",) pour changer la valeur.
J'espère que cela est utile.
la source
Avec Spark csv intégré, vous pouvez le faire facilement avec le nouvel objet SparkSession pour Spark> 2.0.
Vous pouvez définir différentes options.
header
: si votre fichier inclut une ligne d'en-tête en hautinferSchema
: que vous souhaitiez déduire le schéma automatiquement ou non. La valeur par défaut esttrue
. Je préfère toujours fournir un schéma pour garantir des types de données appropriés.mode
: mode d'analyse, PERMISSIVE, DROPMALFORMED ou FAILFASTdelimiter
: pour spécifier le délimiteur, la valeur par défaut est la virgule (',')la source