Charger le fichier CSV avec Spark

110

Je suis nouveau sur Spark et j'essaie de lire les données CSV à partir d'un fichier avec Spark. Voici ce que je fais:

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

Je m'attendrais à ce que cet appel me donne une liste des deux premières colonnes de mon fichier mais j'obtiens cette erreur:

File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range

bien que mon fichier CSV comporte plus d'une colonne.

Kernael
la source

Réponses:

63

Êtes-vous sûr que toutes les lignes ont au moins 2 colonnes? Pouvez-vous essayer quelque chose comme, juste pour vérifier?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

Alternativement, vous pouvez imprimer le coupable (le cas échéant):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()
G Quintana
la source
C'était tout, une ligne avec une seule colonne, merci.
Kernael
2
Il est préférable d'analyser en utilisant la csvbibliothèque intégrée pour gérer tous les échappements car le simple fractionnement par virgule ne fonctionnera pas si, par exemple, il y a des virgules dans les valeurs.
sudo
4
Il existe de nombreux outils pour analyser le csv, ne réinventez pas la roue
Stephen
2
Ce code se cassera s'il y a une virgule entre les guillemets. L'analyse de csv est plus compliquée que le fractionnement ",".
Alceu Costa
Cela remplace les virgules. C'est très mauvais.
rjurney le
184

Spark 2.0.0+

Vous pouvez utiliser directement la source de données csv intégrée:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)

ou

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))

sans inclure de dépendances externes.

Spark <2.0.0 :

Au lieu de l'analyse manuelle, ce qui est loin d'être trivial dans un cas général, je recommanderais spark-csv:

Assurez - vous que CSV Spark est inclus dans le chemin ( --packages, --jars, --driver-class-path)

Et chargez vos données comme suit:

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

Il peut gérer le chargement, l'inférence de schéma, la suppression de lignes mal formées et ne nécessite pas de transfert de données de Python à la JVM.

Remarque :

Si vous connaissez le schéma, il est préférable d'éviter l'inférence de schéma et de le transmettre à DataFrameReader. En supposant que vous ayez trois colonnes - entier, double et chaîne:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))
zéro323
la source
6
Si vous faites cela, n'oubliez pas d'inclure le package csv databricks lorsque vous ouvrez le shell pyspark ou utilisez spark-submit. Par exemple, pyspark --packages com.databricks:spark-csv_2.11:1.4.0(assurez-vous de remplacer les versions de databricks / spark par celles que vous avez installées).
Galen Long
Est-ce csvContext ou sqlContext dans pyspark? Parce que dans scala, vous avez besoin de csvContext
Geoffrey Anderson
28
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|");

print(df.collect())
y durga prasad
la source
utilisez 'sep pas' separator 'comme suit: df = spark.read.csv ("/ home / stp / test1.csv", header = True, sep = "|")
Grant Shannon
18

Et encore une autre option qui consiste à lire le fichier CSV à l'aide de Pandas puis à importer le Pandas DataFrame dans Spark.

Par exemple:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
JP Mercier
la source
7
Pourquoi OP voudrait-il faire sur Spark s'il est capable de charger des données dans des pandas
WoodChopper
Je ne veux
Panda autorise la segmentation des fichiers lors de la lecture, il existe donc encore un cas d'utilisation pour que Pandas gère l'analyse initiale des fichiers. Voir ma réponse ci-dessous pour le code.
abby sobh
Attention: Pandas gère également le schéma de colonne différemment de Spark, en particulier lorsque des espaces sont impliqués. Il est plus sûr de simplement charger csv sous forme de chaînes pour chaque colonne.
AntiPawn79
@WoodChopper Vous pouvez utiliser Pandas comme UDF dans Spark, non?
flow2k
16

Le simple fractionnement par virgule divisera également les virgules qui se trouvent dans les champs (par exemple a,b,"1,2,3",c), ce n'est donc pas recommandé. La réponse de zero323 est bonne si vous souhaitez utiliser l'API DataFrames, mais si vous souhaitez vous en tenir à la base Spark, vous pouvez analyser les csvs en Python de base avec le module csv :

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

EDIT: Comme @muon l'a mentionné dans les commentaires, cela traitera l'en-tête comme n'importe quelle autre ligne, vous devrez donc l'extraire manuellement. Par exemple, header = rdd.first(); rdd = rdd.filter(lambda x: x != header)(assurez-vous de ne pas modifier headeravant l'évaluation du filtre). Mais à ce stade, il vaut probablement mieux utiliser un analyseur csv intégré.

Galen Long
la source
1
Vous n'avez pas besoin de Hive pour utiliser DataFrames. Concernant votre solution: a) Il n'y a pas besoin de StringIO. csvpeut utiliser n'importe quel itérable b) __next__ne doit pas être utilisé directement et échouera sur une ligne vide. Jetez un oeil à flatMap c) Il serait beaucoup plus efficace d'utiliser mapPartitionsau lieu d'initialiser le lecteur sur chaque ligne :)
zero323
Merci beaucoup pour les corrections! Avant de modifier ma réponse, je veux m'assurer de bien comprendre. 1) Pourquoi fonctionne-t-il rdd.mapPartitions(lambda x: csv.reader(x))tout en rdd.map(lambda x: csv.reader(x))lançant une erreur? Je m'attendais à ce que les deux lancent la même chose TypeError: can't pickle _csv.reader objects. Il semble aussi que j'appelle mapPartitionsautomatiquement un équivalent de "readlines" sur l' csv.readerobjet, où avec map, j'avais besoin d'appeler __next__explicitement pour sortir les listes du csv.reader. 2) D'où flatMapvient-il? L'appel mapPartitionsseul a fonctionné pour moi.
Galen Long
1
rdd.mapPartitions(lambda x: csv.reader(x))fonctionne car mapPartitionsattend un Iterableobjet. Si vous voulez être explicite, vous pouvez comprendre ou générer une expression. mapseul ne fonctionne pas car il n'itère pas sur l'objet. D'où ma suggestion d'utiliser flatMap(lambda x: csv.reader([x]))qui itérera sur le lecteur. Mais mapPartitionsc'est beaucoup mieux ici.
zero323
1
notez que cela lira l'en-tête comme une ligne de données, pas comme un en-tête
muon
7

C'est dans PYSPARK

path="Your file path with file name"

df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)

Ensuite, vous pouvez vérifier

df.show(5)
df.count()
bouton amarnath
la source
6

Si vous souhaitez charger csv en tant que dataframe, vous pouvez effectuer les opérations suivantes:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file

Cela a bien fonctionné pour moi.

Jeril
la source
@GalenLong si cela ne vous dérange pas, pouvez-vous partager la réponse déjà existante
Jeril
Bizarre, je jure qu'il y avait une autre réponse avec cette solution. Peut-être ai-je confondu cela avec une autre question. Ma faute.
Galen Long
5

Ceci est conforme à ce que JP Mercier suggérait initialement à propos de l'utilisation de Pandas, mais avec une modification majeure: si vous lisez des données dans Pandas par morceaux, elles devraient être plus malléables. Cela signifie que vous pouvez analyser un fichier beaucoup plus volumineux que ce que Pandas peut réellement gérer en une seule pièce et le transmettre à Spark dans des tailles plus petites. (Cela répond également au commentaire sur les raisons pour lesquelles on voudrait utiliser Spark s'il peut tout charger dans Pandas de toute façon.)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
abby sobh
la source
5

Maintenant, il existe également une autre option pour tout fichier csv général: https://github.com/seahboonsiew/pyspark-csv comme suit:

Supposons que nous ayons le contexte suivant

sc = SparkContext
sqlCtx = SQLContext or HiveContext

Tout d'abord, distribuez pyspark-csv.py aux exécuteurs à l'aide de SparkContext

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

Lisez les données csv via SparkContext et convertissez-les en DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
optimiste
la source
3

Si vos données csv ne contiennent pas de nouvelles lignes dans aucun des champs, vous pouvez charger vos données avec textFile()et les analyser

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)
iec2011007
la source
2

Si vous avez une ou plusieurs lignes avec moins ou plus de colonnes que 2 dans l'ensemble de données, cette erreur peut survenir.

Je suis également nouveau sur Pyspark et j'essaye de lire le fichier CSV. Le code suivant a fonctionné pour moi:

Dans ce code, j'utilise l'ensemble de données de kaggle, le lien est: https://www.kaggle.com/carrie1/ecommerce-data

1. Sans mentionner le schéma:

from pyspark.sql import SparkSession  
scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()

Vérifiez maintenant les colonnes: sdfData.columns

La sortie sera:

['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']

Vérifiez le type de données pour chaque colonne:

sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))

Cela donnera le cadre de données avec toutes les colonnes avec le type de données StringType

2. Avec schéma: si vous connaissez le schéma ou souhaitez modifier le type de données de n'importe quelle colonne du tableau ci-dessus, utilisez ceci (disons que j'ai les colonnes suivantes et que je les souhaite dans un type de données particulier pour chacune d'elles)

from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
    schema = StructType([\
        StructField("InvoiceNo", IntegerType()),\
        StructField("StockCode", StringType()), \
        StructField("Description", StringType()),\
        StructField("Quantity", IntegerType()),\
        StructField("InvoiceDate", StringType()),\
        StructField("CustomerID", DoubleType()),\
        StructField("Country", StringType())\
    ])

scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL example: Reading CSV file with schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)

Vérifiez maintenant le schéma pour le type de données de chaque colonne:

sdfData.schema

StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

Modifié: Nous pouvons également utiliser la ligne de code suivante sans mentionner explicitement le schéma:

sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema

La sortie est:

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))

La sortie ressemblera à ceci:

sdfData.show()

+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|      2.55|  17850|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|      2.75|  17850|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|      7.65|  17850|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|      4.25|  17850|
|   536366|    22633|HAND WARMER UNION...|       6|12/1/2010 8:28|      1.85|  17850|
|   536366|    22632|HAND WARMER RED P...|       6|12/1/2010 8:28|      1.85|  17850|
|   536367|    84879|ASSORTED COLOUR B...|      32|12/1/2010 8:34|      1.69|  13047|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22749|FELTCRAFT PRINCES...|       8|12/1/2010 8:34|      3.75|  13047|
|   536367|    22310|IVORY KNITTED MUG...|       6|12/1/2010 8:34|      1.65|  13047|
|   536367|    84969|BOX OF 6 ASSORTED...|       6|12/1/2010 8:34|      4.25|  13047|
|   536367|    22623|BOX OF VINTAGE JI...|       3|12/1/2010 8:34|      4.95|  13047|
|   536367|    22622|BOX OF VINTAGE AL...|       2|12/1/2010 8:34|      9.95|  13047|
|   536367|    21754|HOME BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21755|LOVE BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21777|RECIPE BOX WITH M...|       4|12/1/2010 8:34|      7.95|  13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows
Yogesh
la source
1

Lors de l'utilisation spark.read.csv, je trouve que l'utilisation des options escape='"'et multiLine=Truefournit la solution la plus cohérente à la norme CSV , et selon mon expérience, fonctionne le mieux avec les fichiers CSV exportés à partir de Google Sheets.

C'est,

#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
     inferSchema=False, header=True)
flow2k
la source
d'où vient l'étincelle? c'est ça import pyspark as spark?
Luk Aron
@LukAron Dans un shell pyspark, sparkest déjà initialisé. Dans un script soumis par spark-submit, vous pouvez l'instancier en tant que from pyspark.sql import SparkSession; spark = SparkSession.builder.getOrCreate().
flow2k