Spark - charger le fichier CSV en tant que DataFrame?

142

Je voudrais lire un CSV dans Spark et le convertir en DataFrame et le stocker en HDFS avec df.registerTempTable("table_name")

J'ai essayé:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

Erreur que j'ai obtenue:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Quelle est la bonne commande pour charger un fichier CSV en tant que DataFrame dans Apache Spark?

Donbeo
la source
vérifiez ce lien pour le faire dans Spark 2.0
mrsrinivas

Réponses:

181

spark-csv fait partie des fonctionnalités principales de Spark et ne nécessite pas de bibliothèque distincte. Donc tu pourrais juste faire par exemple

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

Dans scala, (cela fonctionne pour toute mention de délimiteur de formatage "," pour csv, "\ t" pour tsv, etc.)

val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")

Shyamendra Solanki
la source
164

Analyser CSV et charger en tant que DataFrame / DataSet avec Spark 2.x

Tout d'abord, initialisez l' SparkSessionobjet par défaut, il sera disponible dans les shells commespark

val spark = org.apache.spark.sql.SparkSession.builder
        .master("local") # Change it as per your cluster
        .appName("Spark CSV Reader")
        .getOrCreate;

Utilisez l'une des méthodes suivantes pour charger CSV en tant que DataFrame/DataSet

1. Faites-le de manière programmatique

 val df = spark.read
         .format("csv")
         .option("header", "true") //first line in file has headers
         .option("mode", "DROPMALFORMED")
         .load("hdfs:///csv/file/dir/file.csv")

Mise à jour: Ajout de toutes les options d'ici au cas où le lien serait rompu à l'avenir

  • chemin : emplacement des fichiers. Similaire à Spark, il peut accepter les expressions globales Hadoop standard.
  • header : lorsqu'il est défini sur true, la première ligne de fichiers sera utilisée pour nommer les colonnes et ne sera pas incluse dans les données. Tous les types seront supposés string. La valeur par défaut est false.
  • délimiteur : par défaut, les colonnes sont délimitées à l'aide de, mais le délimiteur peut être défini sur n'importe quel caractère
  • quote : par défaut, le caractère guillemet est ", mais peut être défini sur n'importe quel caractère. Les délimiteurs entre guillemets sont ignorés
  • escape : par défaut, le caractère d'échappement est, mais peut être défini sur n'importe quel caractère. Les caractères de guillemet échappés sont ignorés
  • parserLib : par défaut, c'est " commons " qui peut être réglé sur " univocity " pour utiliser cette bibliothèque pour l'analyse CSV.
  • mode : détermine le mode d'analyse. Par défaut, il est PERMISSIF. Les valeurs possibles sont:
    • PERMISSIF : essaie d'analyser toutes les lignes: des valeurs nulles sont insérées pour les jetons manquants et les jetons supplémentaires sont ignorés.
    • DROPMALFORMED : supprime les lignes qui ont moins ou plus de jetons que prévu ou les jetons qui ne correspondent pas au schéma
    • FAILFAST : abandonne avec une RuntimeException si rencontre un jeu de caractères de ligne malformé: par défaut à 'UTF-8' mais peut être défini sur d'autres noms de jeu de caractères valides
  • inferSchema : déduit automatiquement les types de colonnes. Il nécessite un passage supplémentaire sur les données et est faux par défaut commentaire: sauter les lignes commençant par ce caractère. La valeur par défaut est "#". Désactivez les commentaires en définissant ce paramètre sur null.
  • nullValue : spécifie une chaîne qui indique une valeur nulle, tous les champs correspondant à cette chaîne seront définis comme des valeurs nulles dans le DataFrame
  • dateFormat : spécifie une chaîne qui indique le format de date à utiliser lors de la lecture des dates ou des horodatages. Les formats de date personnalisés suivent les formats de java.text.SimpleDateFormat. Cela s'applique à la fois à DateType et TimestampType. Par défaut, il est nul, ce qui signifie que vous essayez d'analyser les heures et la date par java.sql.Timestamp.valueOf () et java.sql.Date.valueOf ().

2. Vous pouvez également utiliser cette méthode SQL

 val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")

Dépendances :

 "org.apache.spark" % "spark-core_2.11" % 2.0.0,
 "org.apache.spark" % "spark-sql_2.11" % 2.0.0,

Version Spark <2.0

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("mode", "DROPMALFORMED")
    .load("csv/file/path"); 

Dépendances:

"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,
mrsrinivas
la source
cette session nécessite-t-elle une ruche? Je reçois des erreurs de ruche.
Puneet
2
Ce n'est pas nécessaire. Seul spark-core_2.11et spark-sql_2.11de la 2.0.1version est très bien. Si possible, ajoutez le message d'erreur.
mrsrinivas
1
pouvons-nous convertir un fichier délimité par des tubes en dataframe?
Omkar
3
@OmkarPuttagunta: Oui, bien sûr! essayez quelque chose comme ça spark.read.format("csv").option("delimiter ", "|") ...
mrsrinivas
1
L'autre option pour programmatic wayest de laisser le .format("csv")et de le remplacer .load(...par .csv(.... La optionméthode appartient à la classe DataFrameReader telle qu'elle est renvoyée par la readméthode, où les méthodes loadet csvrenvoient un dataframe et ne peuvent donc pas avoir d'options marquées après leur appel. Cette réponse est assez complète mais vous devriez créer un lien vers la documentation afin que les gens puissent voir toutes les autres options CSV disponibles spark.apache.org/docs/latest/api/scala/… *): org.apache.spark.sql.DataFrame
Davos
17

C'est pour qui Hadoop est 2.6 et Spark est 1.6 et sans package "databricks".

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;

val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))

val schema = new StructType()
    .add(StructField("id", StringType, true))
    .add(StructField("val", IntegerType, true))

val df = sqlContext.createDataFrame(rdd, schema)
Eric Yiwei Liu
la source
12

Avec Spark 2.0, voici comment vous pouvez lire CSV

val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
  .config(conf = conf)
  .appName("spark session example")
  .getOrCreate()

val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
  csv(path)
penny chan
la source
5
Y a-t-il une différence entre spark.read.csv(path)et spark.read.format("csv").load(path)?
Eric
8

En Java 1.8, cet extrait de code fonctionne parfaitement pour lire les fichiers CSV

POM.xml

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.8</version>
</dependency>
<dependency>
    <groupId>com.databricks</groupId>
    <artifactId>spark-csv_2.10</artifactId>
    <version>1.4.0</version>
</dependency>

Java

SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);

Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");

        //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
Rajeev Rathor
la source
Bien que cela puisse être utile à quelqu'un. La question a une balise Scala.
OneCricketeer
5

L'analyse d'un fichier CSV pose de nombreux défis, elle continue de s'additionner si la taille du fichier est plus grande, s'il y a des caractères non anglais / d'échappement / séparateur / autres dans les valeurs de colonne, cela pourrait provoquer des erreurs d'analyse.

La magie réside alors dans les options utilisées. Ceux qui ont fonctionné pour moi et espèrent couvrir la plupart des cas extrêmes sont dans le code ci-dessous:

### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()

### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path, 
                         header=True, 
                         multiLine=True, 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True, 
                         encoding="UTF-8",
                         sep=',',
                         quote='"', 
                         escape='"',
                         maxColumns=2,
                         inferSchema=True)

J'espère que cela pourra aider. Pour plus d'informations, reportez-vous à: Utilisation de PySpark 2 pour lire des fichiers CSV contenant du code source HTML

Remarque: Le code ci-dessus provient de l'API Spark 2, où l'API de lecture de fichier CSV est fournie avec des packages intégrés de Spark installable.

Remarque: PySpark est un wrapper Python pour Spark et partage la même API que Scala / Java.

karthiks
la source
Merci beaucoup, vous m'avez sauvé la vie: D
Khubaib Raza
4

L'exemple de Penny's Spark 2 est la façon de le faire dans spark2. Il y a une autre astuce: faites générer cet en-tête pour vous en effectuant une analyse initiale des données, en définissant l'option inferSchemasurtrue

Voici donc, en supposant qu'il sparks'agit d'une session Spark que vous avez configurée, l'opération à charger dans le fichier d'index CSV de toutes les images Landsat hébergées par Amazon sur S3.

  /*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
   *
   *    http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */

val csvdata = spark.read.options(Map(
    "header" -> "true",
    "ignoreLeadingWhiteSpace" -> "true",
    "ignoreTrailingWhiteSpace" -> "true",
    "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
    "inferSchema" -> "true",
    "mode" -> "FAILFAST"))
  .csv("s3a://landsat-pds/scene_list.gz")

La mauvaise nouvelle est la suivante: cela déclenche une analyse du fichier; pour quelque chose de gros comme ce fichier CSV zippé de plus de 20 Mo, cela peut prendre 30 secondes sur une connexion longue distance. Gardez cela à l'esprit: il vaut mieux coder manuellement le schéma une fois que vous l'avez reçu.

(extrait de code Apache Software License 2.0 sous licence pour éviter toute ambiguïté; ce que j'ai fait comme test de démonstration / d'intégration de l'intégration S3)

stevel
la source
Je n'avais pas vu cette méthode csv ou passer une carte aux options. Convenu toujours mieux de fournir un schéma explicite, inferSchema est bien pour quick n dirty (aka data science) mais terrible pour ETL.
Davos
2

Dans le cas où vous construisez un jar avec scala 2.11 et Apache 2.0 ou supérieur.

Il n'est pas nécessaire de créer un objet sqlContextou sparkContext. Un SparkSessionseul objet suffit à tous les besoins.

Voici mycode qui fonctionne bien:

import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.log4j.{Level, LogManager, Logger}

object driver {

  def main(args: Array[String]) {

    val log = LogManager.getRootLogger

    log.info("**********JAR EXECUTION STARTED**********")

    val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
    val df = spark.read.format("csv")
      .option("header", "true")
      .option("delimiter","|")
      .option("inferSchema","true")
      .load("d:/small_projects/spark/test.pos")
    df.show()
  }
}

Si vous utilisez un cluster, passez simplement .master("local")à .master("yarn")lors de la définition de l' sparkBuilderobjet

Le Spark Doc couvre ceci: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html

swapnil shashank
la source
C'est la même chose que les réponses existantes
mrsrinivas
0

Ajoutez les dépendances Spark suivantes au fichier POM:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

// Configuration de Spark:

val spark = SparkSession.builder (). master ("local"). appName ("Exemple d'application"). getOrCreate ()

// Lire le fichier csv:

val df = spark.read.option ("header", "true"). csv ("FILE_PATH")

// Afficher la sortie

df.show ()

S_K
la source
0

Pour lire à partir du chemin relatif sur le système, utilisez la méthode System.getProperty pour obtenir le répertoire actuel et utilise en outre pour charger le fichier à l'aide du chemin relatif.

scala> val path = System.getProperty("user.dir").concat("/../2015-summary.csv")
scala> val csvDf = spark.read.option("inferSchema","true").option("header", "true").csv(path)
scala> csvDf.take(3)

étincelle: 2.4.4 échelle: 2.11.12

Venkat Kotra
la source
0

Avec Spark 2.4+, si vous souhaitez charger un csv à partir d'un répertoire local, vous pouvez utiliser 2 sessions et le charger dans la ruche. La première session doit être créée avec la configuration master () comme "local [*]" et la deuxième session avec "yarn" et Hive activés.

Celui ci-dessous a fonctionné pour moi.

import org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.sql._

object testCSV { 

  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark_local = SparkSession.builder().appName("CSV local files reader").master("local[*]").getOrCreate()

    import spark_local.implicits._
    spark_local.sql("SET").show(100,false)
    val local_path="/tmp/data/spend_diversity.csv"  // Local file
    val df_local = spark_local.read.format("csv").option("inferSchema","true").load("file://"+local_path) // "file://" is mandatory
    df_local.show(false)

    val spark = SparkSession.builder().appName("CSV HDFS").config("spark.sql.warehouse.dir", "/apps/hive/warehouse").enableHiveSupport().getOrCreate()

    import spark.implicits._
    spark.sql("SET").show(100,false)
    val df = df_local
    df.createOrReplaceTempView("lcsv")
    spark.sql(" drop table if exists work.local_csv ")
    spark.sql(" create table work.local_csv as select * from lcsv ")

   }

Lorsqu'il a été exécuté avec, spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jaril s'est bien passé et a créé la table dans la ruche.

pile0114106
la source
-1

Le format de fichier par défaut est Parquet avec spark.read .. et le fichier de lecture csv c'est pourquoi vous obtenez l'exception. Spécifiez le format csv avec l'API que vous essayez d'utiliser

tazak
la source
-1

Essayez ceci si vous utilisez Spark 2.0+

For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")


For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")

For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")

Remarque: - ce travail pour tout fichier délimité. Utilisez simplement l'option ("delimiter",) pour changer la valeur.

J'espère que cela est utile.

Ajay Ahuja
la source
C'est la même chose que les réponses existantes
mrsrinivas
-1

Avec Spark csv intégré, vous pouvez le faire facilement avec le nouvel objet SparkSession pour Spark> 2.0.

val df = spark.
        read.
        option("inferSchema", "false").
        option("header","true").
        option("mode","DROPMALFORMED").
        option("delimiter", ";").
        schema(dataSchema).
        csv("/csv/file/dir/file.csv")
df.show()
df.printSchema()

Vous pouvez définir différentes options.

  • header: si votre fichier inclut une ligne d'en-tête en haut
  • inferSchema: que vous souhaitiez déduire le schéma automatiquement ou non. La valeur par défaut est true. Je préfère toujours fournir un schéma pour garantir des types de données appropriés.
  • mode: mode d'analyse, PERMISSIVE, DROPMALFORMED ou FAILFAST
  • delimiter: pour spécifier le délimiteur, la valeur par défaut est la virgule (',')
Piyush Patel
la source