Application de la fonction Python à DataFrame groupé Pandas - quelle est l'approche la plus efficace pour accélérer les calculs?

9

J'ai affaire à un Pandas DataFrame assez volumineux - mon ensemble de données ressemble à une dfconfiguration suivante :

import pandas as pd
import numpy  as np

#--------------------------------------------- SIZING PARAMETERS :
R1 =                    20        # .repeat( repeats = R1 )
R2 =                    10        # .repeat( repeats = R2 )
R3 =                541680        # .repeat( repeats = [ R3, R4 ] )
R4 =                576720        # .repeat( repeats = [ R3, R4 ] )
T  =                 55920        # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used

#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
         { 'measurement_id':        np.repeat( [0, 1], repeats = [ R3, R4 ] ), 
           'time':np.concatenate( [ np.repeat( A1,     repeats = R1 ),
                                    np.repeat( A2,     repeats = R1 ) ] ), 
           'group':        np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
           'object':       np.tile( np.arange( 0, R1 ),                T )
           }
        )

#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
                  df                                                  \
                    .groupby( ['measurement_id', 'time', 'group'] )    \
                    .apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
                    .explode()                                           \
                    .astype( 'float' )                                    \
                    .to_frame( 'var' )                                     \
                    .reset_index( drop = True )
                  ], axis = 1
                )

Remarque: Dans le but d'avoir un exemple minimal, il peut être facilement sous-réglé (par exemple avec df.loc[df['time'] <= 400, :]), mais comme je simule les données de toute façon, je pensais que la taille d'origine donnerait une meilleure vue d'ensemble.

Pour chaque groupe défini par ['measurement_id', 'time', 'group']je dois appeler la fonction suivante:

from sklearn.cluster import SpectralClustering
from pandarallel     import pandarallel

def cluster( x, index ):
    if len( x ) >= 2:
        data = np.asarray( x )[:, np.newaxis]
        clustering = SpectralClustering( n_clusters   =  5,
                                         random_state = 42
                                         ).fit( data )
        return pd.Series( clustering.labels_ + 1, index = index )
    else:
        return pd.Series( np.nan, index = index )

Pour améliorer les performances, j'ai essayé deux approches:

Forfait Pandarallel

La première approche a été de paralléliser les calculs à l'aide de pandarallelpackage:

pandarallel.initialize( progress_bar = True )
df \
  .groupby( ['measurement_id', 'time', 'group'] ) \
  .parallel_apply( lambda x: cluster( x['var'], x['object'] ) )

Cependant, cela semble sous-optimal car il consomme beaucoup de RAM et tous les cœurs ne sont pas utilisés dans les calculs (même en spécifiant explicitement le nombre de cœurs dans la pandarallel.initialize()méthode). De plus, parfois les calculs se terminent par diverses erreurs, même si je n'ai pas eu l'occasion de trouver une raison à cela (peut-être un manque de RAM?).

PySpark Pandas UDF

J'ai également essayé un UDF Spark Pandas, bien que je sois totalement nouveau pour Spark. Voici ma tentative:

import findspark;  findspark.init()

from pyspark.sql           import SparkSession
from pyspark.conf          import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types     import *

spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )

@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
    if len( df['var'] ) >= 2:
        data = np.asarray( df['var'] )[:, np.newaxis]
        clustering = SpectralClustering( n_clusters   =  5,
                                         random_state = 42
                                         ).fit( data )
        return pd.DataFrame( clustering.labels_ + 1,
                             index = df['object']
                             )
    else:
        return pd.DataFrame( np.nan,
                             index = df['object']
                             )

res = df                                           \
        .groupBy( ['id_half', 'frame', 'team_id'] ) \
        .apply( cluster )                            \
        .toPandas()

Malheureusement, les performances n'étaient pas satisfaisantes également, et d'après ce que j'ai lu sur le sujet, cela peut être juste le fardeau d'utiliser la fonction UDF, écrite en Python et le besoin associé de convertir tous les objets Python en objets Spark et inversement.

Donc, voici mes questions:

  1. L'une ou l'autre de mes approches pourrait-elle être ajustée pour éliminer les éventuels goulots d'étranglement et améliorer les performances? (par exemple, configuration de PySpark, ajustement des opérations sous-optimales, etc.)
  2. Existe-t-il de meilleures alternatives? Comment se comparent-ils aux solutions proposées en termes de performances?
Kuba_
la source
2
avez-vous fait des recherches dask ?
Danila Ganchar
1
Pas encore, mais merci pour votre suggestion - je vais
essayer
malheureusement je n'ai pas travaillé avec dask(((donc mon commentaire c'est juste un conseil pour la recherche.
Danila Ganchar
Par performance, je voulais dire le temps pendant lequel les calculs peuvent être terminés.
Kuba_

Réponses:

1

Q : " Est-ce que l' une de mes approches pourrait être ajustée pour éliminer les goulots d'étranglement possibles et améliorer les performances? (Par exemple, configuration PySpark, ajustement des opérations sous-optimales, etc.) "

+1pour avoir mentionné les frais généraux de configuration supplémentaires pour l'une ou l'autre stratégie informatique. Cela fait toujours un seuil de rentabilité, seulement après quoi une non- [SERIAL]stratégie peut obtenir une joie bénéfique de l' [TIME]accélération souhaitée du domaine (pourtant, si autre, généralement [SPACE]- les coûts du domaine le permettent ou restent réalisables - oui, la RAM. .. existence et accès à un appareil de cette taille, budget et autres contraintes similaires dans le monde réel)

Tout d'abord,
la vérification avant le vol, avant le décollage

La nouvelle formulation stricte des frais généraux de la loi d'Amdahl est actuellement en mesure d'incorporer ces deux pSO + pTOfrais généraux supplémentaires et les reflète dans la prévision des niveaux d'accélération réalisables, y compris le seuil de rentabilité point, depuis lequel il peut devenir significatif (dans un sens coûts / effets, efficacité) d’aller parallèlement.

entrez la description de l'image ici

Pourtant,
ce n'est pas notre problème principal ici .
Cela vient ensuite:

Ensuite,
étant donné les coûts de calcul de SpectralClustering(), qui va ici utiliser le noyau de la fonction de Boltzmann radial, ~ exp( -gamma * distance( data, data )**2 )il ne semble pas y avoir d'avancée de la division de data-object sur un nombre quelconque d'unités de travail disjointes, car le distance( data, data )composant, par définition, n'a que pour visitez tous les dataéléments (réf. les coûts de communication des { process | node }topologies distribuées de valeur à toute valeur sont, pour des raisons évidentes, terriblement mauvais, sinon les pires cas d' { process | node }utilisation pour le traitement distribué, sinon les anti-modèles simples (à l'exception de certains tissus en effet obscurs, sans mémoire / sans état, mais néanmoins informatiques).

Pour les analystes pédants, oui - ajoutez à cela (et nous pouvons déjà dire un mauvais état) les coûts - encore une fois - de tout-à-tout -k-moyens- traitement, ici à propos de O( N^( 1 + 5 * 5 ) )cela va, car N ~ len( data ) ~ 1.12E6+, terriblement contre notre souhait d'en avoir traitement intelligent et rapide.

Et alors?

Bien que les coûts de configuration ne soient pas négligés, les coûts de communication accrus désactiveront presque certainement toute amélioration de l'utilisation des tentatives esquissées ci-dessus pour passer d'un flux de [SERIAL]processus pur à une forme d' orchestration juste - [CONCURRENT]ou vraie - [PARALLEL]de certaines sous-unités de travail , en raison de l'augmentation des frais généraux liés à la nécessité d'implémenter (une paire en tandem) des topologies de passage de valeur à n'importe quelle valeur.

Si ce n'était pas pour eux?

Eh bien, cela ressemble à un oxymore de science informatique - même si cela était possible, les coûts des distances précalculées de n'importe quel à n'importe quel (qui prendraient ces immenses [TIME]coûts de complexité du domaine "à l'avance" (Où? Comment? Y a-t-il des une autre latence inévitable, permettant un masquage de latence possible par une accumulation incrémentielle (inconnue jusqu'à présent) d'une matrice de distance complète à l'avenir?)) ne ferait que repositionner ces coûts principalement présents à un autre emplacement dans [TIME]- et [SPACE]-Domains, pas les réduire.

Q : "Sont-ils de meilleures alternatives? "

La seule, à ce que je sache jusqu'à présent, est d'essayer, si le problème est possible de se reformuler dans un autre, une mode problème formulée par QUBO (réf .: Q uantum- U nconstrained- B inary- O ptimisation , la bonne nouvelle est que des outils pour le faire, une base de connaissances de première main et une expérience pratique de résolution de problèmes existent et se développent)

Q : Comment se comparent-ils aux solutions fournies en termes de performances?

Les performances sont à couper le souffle - le problème formulé par QUBO a un O(1)solveur prometteur (!) En temps constant (dans [TIME]-Domain) et quelque peu restreint dans [SPACE]-Domain (où les astuces LLNL récemment annoncées peuvent aider à éviter ce monde physique, la mise en œuvre actuelle de QPU, la contrainte du problème tailles).

user3666197
la source
Ceci est une réponse intéressante, mais semble manquer le point - OP forme plusieurs petits modèles, pas un seul. Votre observation principale n'est donc pas pertinente.
user10938362
@ user10938362 Comment votre propriété revendiquée (formation de petits modèles) se traduit-elle par une mesure des coûts de traitement big-O autre que celle indiquée ci-dessus? Bien sûr, de nombreux modèles plus petits promettent une somme théoriquement juste en croissance linéaire des coûts (encore) de Big O du traitement individuel (désormais plus petit en N, mais pas dans d'autres facteurs) , mais vous devez ajouter à cela une somme beaucoup plus chère de tous coûts supplémentaires des frais généraux de configuration et de terminaison plus tous les frais généraux de communication supplémentaires (paramètres / données / résultats + généralement également des paires de coûts de traitement SER / DES à chaque étape)
user3666197
0

Ce n'est pas une réponse, mais ...

Si vous courez

df.groupby(['measurement_id', 'time', 'group']).apply(
    lambda x: cluster(x['var'], x['object']))

(c'est-à-dire avec les Pandas seuls), vous remarquerez que vous utilisez déjà plusieurs cœurs. En effet, sklearnutilise joblibpar défaut pour paralléliser le travail. Vous pouvez échanger le planificateur en faveur de Dask et peut-être obtenir plus d'efficacité sur le partage des données entre les threads, mais tant que le travail que vous faites est lié au processeur comme celui-ci, vous ne pourrez rien faire pour l'accélérer.

En bref, c'est un problème d'algorithme: déterminez ce dont vous avez vraiment besoin pour calculer, avant d'essayer de considérer différents cadres pour le calculer.

mdurant
la source
Pourriez-vous expliquer pourquoi mentionnez-vous «… partage des données entre les threads …» une fois que la répartition du travail a été organisée par des processusjoblib générés , qui n'ont rien à voir avec les threads, moins avec le partage? Merci pour votre aimable clarification des arguments.
user3666197
Exactement, jboblib utilise normalement des processus, mais il peut également utiliser dask comme backend, où vous pouvez choisir votre mélange de threads et de processus.
mdurant
Je suis un peu novice en informatique parallèle, mais même si sklearn utilise la parallélisation, n'est-ce pas inutile dans ces paramètres? Je veux dire, les opérations effectuées par sklearn sont extrêmement simples car chaque opération de clustering est appliquée à seulement 10 points. Encore une fois, je peux me tromper ici, mais je pense que la façon dont nous parallélisons le traitement de morceaux de données originales est le vrai problème.
Kuba_
"n'est-ce pas inutile dans ces paramètres" - eh bien, vous utilisez 8 cœurs de processeur au lieu de 1.
mdurant
0

Je ne suis pas un expert Dask, mais je fournis le code suivant comme référence:

import dask.dataframe as ddf

df = ddf.from_pandas(df, npartitions=4) # My PC has 4 cores

task = df.groupby(["measurement_id", "time", "group"]).apply(
    lambda x: cluster(x["var"], x["object"]),
    meta=pd.Series(np.nan, index=pd.Series([0, 1, 1, 1])),
)

res = task.compute()
bouclé
la source