Je suis nouveau sur Spark et j'essaie de lire les données CSV à partir d'un fichier avec Spark. Voici ce que je fais:
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
Je m'attendrais à ce que cet appel me donne une liste des deux premières colonnes de mon fichier mais j'obtiens cette erreur:
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
bien que mon fichier CSV comporte plus d'une colonne.
python
csv
apache-spark
pyspark
Kernael
la source
la source
csv
bibliothèque intégrée pour gérer tous les échappements car le simple fractionnement par virgule ne fonctionnera pas si, par exemple, il y a des virgules dans les valeurs.","
.Spark 2.0.0+
Vous pouvez utiliser directement la source de données csv intégrée:
ou
sans inclure de dépendances externes.
Spark <2.0.0 :
Au lieu de l'analyse manuelle, ce qui est loin d'être trivial dans un cas général, je recommanderais
spark-csv
:Assurez - vous que CSV Spark est inclus dans le chemin (
--packages
,--jars
,--driver-class-path
)Et chargez vos données comme suit:
Il peut gérer le chargement, l'inférence de schéma, la suppression de lignes mal formées et ne nécessite pas de transfert de données de Python à la JVM.
Remarque :
Si vous connaissez le schéma, il est préférable d'éviter l'inférence de schéma et de le transmettre à
DataFrameReader
. En supposant que vous ayez trois colonnes - entier, double et chaîne:la source
pyspark --packages com.databricks:spark-csv_2.11:1.4.0
(assurez-vous de remplacer les versions de databricks / spark par celles que vous avez installées).la source
Et encore une autre option qui consiste à lire le fichier CSV à l'aide de Pandas puis à importer le Pandas DataFrame dans Spark.
Par exemple:
la source
Le simple fractionnement par virgule divisera également les virgules qui se trouvent dans les champs (par exemple
a,b,"1,2,3",c
), ce n'est donc pas recommandé. La réponse de zero323 est bonne si vous souhaitez utiliser l'API DataFrames, mais si vous souhaitez vous en tenir à la base Spark, vous pouvez analyser les csvs en Python de base avec le module csv :EDIT: Comme @muon l'a mentionné dans les commentaires, cela traitera l'en-tête comme n'importe quelle autre ligne, vous devrez donc l'extraire manuellement. Par exemple,
header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(assurez-vous de ne pas modifierheader
avant l'évaluation du filtre). Mais à ce stade, il vaut probablement mieux utiliser un analyseur csv intégré.la source
StringIO
.csv
peut utiliser n'importe quel itérable b)__next__
ne doit pas être utilisé directement et échouera sur une ligne vide. Jetez un oeil à flatMap c) Il serait beaucoup plus efficace d'utilisermapPartitions
au lieu d'initialiser le lecteur sur chaque ligne :)rdd.mapPartitions(lambda x: csv.reader(x))
tout enrdd.map(lambda x: csv.reader(x))
lançant une erreur? Je m'attendais à ce que les deux lancent la même choseTypeError: can't pickle _csv.reader objects
. Il semble aussi que j'appellemapPartitions
automatiquement un équivalent de "readlines" sur l'csv.reader
objet, où avecmap
, j'avais besoin d'appeler__next__
explicitement pour sortir les listes ducsv.reader
. 2) D'oùflatMap
vient-il? L'appelmapPartitions
seul a fonctionné pour moi.rdd.mapPartitions(lambda x: csv.reader(x))
fonctionne carmapPartitions
attend unIterable
objet. Si vous voulez être explicite, vous pouvez comprendre ou générer une expression.map
seul ne fonctionne pas car il n'itère pas sur l'objet. D'où ma suggestion d'utiliserflatMap(lambda x: csv.reader([x]))
qui itérera sur le lecteur. MaismapPartitions
c'est beaucoup mieux ici.C'est dans PYSPARK
Ensuite, vous pouvez vérifier
la source
Si vous souhaitez charger csv en tant que dataframe, vous pouvez effectuer les opérations suivantes:
Cela a bien fonctionné pour moi.
la source
Ceci est conforme à ce que JP Mercier suggérait initialement à propos de l'utilisation de Pandas, mais avec une modification majeure: si vous lisez des données dans Pandas par morceaux, elles devraient être plus malléables. Cela signifie que vous pouvez analyser un fichier beaucoup plus volumineux que ce que Pandas peut réellement gérer en une seule pièce et le transmettre à Spark dans des tailles plus petites. (Cela répond également au commentaire sur les raisons pour lesquelles on voudrait utiliser Spark s'il peut tout charger dans Pandas de toute façon.)
la source
Maintenant, il existe également une autre option pour tout fichier csv général: https://github.com/seahboonsiew/pyspark-csv comme suit:
Supposons que nous ayons le contexte suivant
Tout d'abord, distribuez pyspark-csv.py aux exécuteurs à l'aide de SparkContext
Lisez les données csv via SparkContext et convertissez-les en DataFrame
la source
Si vos données csv ne contiennent pas de nouvelles lignes dans aucun des champs, vous pouvez charger vos données avec
textFile()
et les analyserla source
Si vous avez une ou plusieurs lignes avec moins ou plus de colonnes que 2 dans l'ensemble de données, cette erreur peut survenir.
Je suis également nouveau sur Pyspark et j'essaye de lire le fichier CSV. Le code suivant a fonctionné pour moi:
Dans ce code, j'utilise l'ensemble de données de kaggle, le lien est: https://www.kaggle.com/carrie1/ecommerce-data
1. Sans mentionner le schéma:
Vérifiez maintenant les colonnes: sdfData.columns
La sortie sera:
Vérifiez le type de données pour chaque colonne:
Cela donnera le cadre de données avec toutes les colonnes avec le type de données StringType
2. Avec schéma: si vous connaissez le schéma ou souhaitez modifier le type de données de n'importe quelle colonne du tableau ci-dessus, utilisez ceci (disons que j'ai les colonnes suivantes et que je les souhaite dans un type de données particulier pour chacune d'elles)
Vérifiez maintenant le schéma pour le type de données de chaque colonne:
Modifié: Nous pouvons également utiliser la ligne de code suivante sans mentionner explicitement le schéma:
La sortie est:
La sortie ressemblera à ceci:
la source
Lors de l'utilisation
spark.read.csv
, je trouve que l'utilisation des optionsescape='"'
etmultiLine=True
fournit la solution la plus cohérente à la norme CSV , et selon mon expérience, fonctionne le mieux avec les fichiers CSV exportés à partir de Google Sheets.C'est,
la source
import pyspark as spark
?spark
est déjà initialisé. Dans un script soumis parspark-submit
, vous pouvez l'instancier en tant quefrom pyspark.sql import SparkSession; spark = SparkSession.builder.getOrCreate()
.