J'ai affaire à un Pandas DataFrame assez volumineux - mon ensemble de données ressemble à une df
configuration suivante :
import pandas as pd
import numpy as np
#--------------------------------------------- SIZING PARAMETERS :
R1 = 20 # .repeat( repeats = R1 )
R2 = 10 # .repeat( repeats = R2 )
R3 = 541680 # .repeat( repeats = [ R3, R4 ] )
R4 = 576720 # .repeat( repeats = [ R3, R4 ] )
T = 55920 # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used
#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
{ 'measurement_id': np.repeat( [0, 1], repeats = [ R3, R4 ] ),
'time':np.concatenate( [ np.repeat( A1, repeats = R1 ),
np.repeat( A2, repeats = R1 ) ] ),
'group': np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
'object': np.tile( np.arange( 0, R1 ), T )
}
)
#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
.explode() \
.astype( 'float' ) \
.to_frame( 'var' ) \
.reset_index( drop = True )
], axis = 1
)
Remarque: Dans le but d'avoir un exemple minimal, il peut être facilement sous-réglé (par exemple avec df.loc[df['time'] <= 400, :]
), mais comme je simule les données de toute façon, je pensais que la taille d'origine donnerait une meilleure vue d'ensemble.
Pour chaque groupe défini par ['measurement_id', 'time', 'group']
je dois appeler la fonction suivante:
from sklearn.cluster import SpectralClustering
from pandarallel import pandarallel
def cluster( x, index ):
if len( x ) >= 2:
data = np.asarray( x )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.Series( clustering.labels_ + 1, index = index )
else:
return pd.Series( np.nan, index = index )
Pour améliorer les performances, j'ai essayé deux approches:
Forfait Pandarallel
La première approche a été de paralléliser les calculs à l'aide de pandarallel
package:
pandarallel.initialize( progress_bar = True )
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.parallel_apply( lambda x: cluster( x['var'], x['object'] ) )
Cependant, cela semble sous-optimal car il consomme beaucoup de RAM et tous les cœurs ne sont pas utilisés dans les calculs (même en spécifiant explicitement le nombre de cœurs dans la pandarallel.initialize()
méthode). De plus, parfois les calculs se terminent par diverses erreurs, même si je n'ai pas eu l'occasion de trouver une raison à cela (peut-être un manque de RAM?).
PySpark Pandas UDF
J'ai également essayé un UDF Spark Pandas, bien que je sois totalement nouveau pour Spark. Voici ma tentative:
import findspark; findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )
@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
if len( df['var'] ) >= 2:
data = np.asarray( df['var'] )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.DataFrame( clustering.labels_ + 1,
index = df['object']
)
else:
return pd.DataFrame( np.nan,
index = df['object']
)
res = df \
.groupBy( ['id_half', 'frame', 'team_id'] ) \
.apply( cluster ) \
.toPandas()
Malheureusement, les performances n'étaient pas satisfaisantes également, et d'après ce que j'ai lu sur le sujet, cela peut être juste le fardeau d'utiliser la fonction UDF, écrite en Python et le besoin associé de convertir tous les objets Python en objets Spark et inversement.
Donc, voici mes questions:
- L'une ou l'autre de mes approches pourrait-elle être ajustée pour éliminer les éventuels goulots d'étranglement et améliorer les performances? (par exemple, configuration de PySpark, ajustement des opérations sous-optimales, etc.)
- Existe-t-il de meilleures alternatives? Comment se comparent-ils aux solutions proposées en termes de performances?
dask
(((donc mon commentaire c'est juste un conseil pour la recherche.Réponses:
+1
pour avoir mentionné les frais généraux de configuration supplémentaires pour l'une ou l'autre stratégie informatique. Cela fait toujours un seuil de rentabilité, seulement après quoi une non-[SERIAL]
stratégie peut obtenir une joie bénéfique de l'[TIME]
accélération souhaitée du domaine (pourtant, si autre, généralement[SPACE]
- les coûts du domaine le permettent ou restent réalisables - oui, la RAM. .. existence et accès à un appareil de cette taille, budget et autres contraintes similaires dans le monde réel)Tout d'abord,
la vérification avant le vol, avant le décollage
La nouvelle formulation stricte des frais généraux de la loi d'Amdahl est actuellement en mesure d'incorporer ces deux
pSO + pTO
frais généraux supplémentaires et les reflète dans la prévision des niveaux d'accélération réalisables, y compris le seuil de rentabilité point, depuis lequel il peut devenir significatif (dans un sens coûts / effets, efficacité) d’aller parallèlement.Pourtant,
ce n'est pas notre problème principal ici .
Cela vient ensuite:
Ensuite,
étant donné les coûts de calcul de
SpectralClustering()
, qui va ici utiliser le noyau de la fonction de Boltzmann radial,~ exp( -gamma * distance( data, data )**2 )
il ne semble pas y avoir d'avancée de la division dedata
-object sur un nombre quelconque d'unités de travail disjointes, car ledistance( data, data )
composant, par définition, n'a que pour visitez tous lesdata
éléments (réf. les coûts de communication des{ process | node }
topologies distribuées de valeur à toute valeur sont, pour des raisons évidentes, terriblement mauvais, sinon les pires cas d'{ process | node }
utilisation pour le traitement distribué, sinon les anti-modèles simples (à l'exception de certains tissus en effet obscurs, sans mémoire / sans état, mais néanmoins informatiques).Pour les analystes pédants, oui - ajoutez à cela (et nous pouvons déjà dire un mauvais état) les coûts - encore une fois - de tout-à-tout -k-moyens- traitement, ici à propos de
O( N^( 1 + 5 * 5 ) )
cela va, carN ~ len( data ) ~ 1.12E6+
, terriblement contre notre souhait d'en avoir traitement intelligent et rapide.Et alors?
Bien que les coûts de configuration ne soient pas négligés, les coûts de communication accrus désactiveront presque certainement toute amélioration de l'utilisation des tentatives esquissées ci-dessus pour passer d'un flux de
[SERIAL]
processus pur à une forme d' orchestration juste -[CONCURRENT]
ou vraie -[PARALLEL]
de certaines sous-unités de travail , en raison de l'augmentation des frais généraux liés à la nécessité d'implémenter (une paire en tandem) des topologies de passage de valeur à n'importe quelle valeur.Si ce n'était pas pour eux?
Eh bien, cela ressemble à un oxymore de science informatique - même si cela était possible, les coûts des distances précalculées de n'importe quel à n'importe quel (qui prendraient ces immenses
[TIME]
coûts de complexité du domaine "à l'avance" (Où? Comment? Y a-t-il des une autre latence inévitable, permettant un masquage de latence possible par une accumulation incrémentielle (inconnue jusqu'à présent) d'une matrice de distance complète à l'avenir?)) ne ferait que repositionner ces coûts principalement présents à un autre emplacement dans[TIME]
- et[SPACE]
-Domains, pas les réduire.La seule, à ce que je sache jusqu'à présent, est d'essayer, si le problème est possible de se reformuler dans un autre, une mode problème formulée par QUBO (réf .: Q uantum- U nconstrained- B inary- O ptimisation , la bonne nouvelle est que des outils pour le faire, une base de connaissances de première main et une expérience pratique de résolution de problèmes existent et se développent)
Les performances sont à couper le souffle - le problème formulé par QUBO a un
O(1)
solveur prometteur (!) En temps constant (dans[TIME]
-Domain) et quelque peu restreint dans[SPACE]
-Domain (où les astuces LLNL récemment annoncées peuvent aider à éviter ce monde physique, la mise en œuvre actuelle de QPU, la contrainte du problème tailles).la source
Ce n'est pas une réponse, mais ...
Si vous courez
(c'est-à-dire avec les Pandas seuls), vous remarquerez que vous utilisez déjà plusieurs cœurs. En effet,
sklearn
utilisejoblib
par défaut pour paralléliser le travail. Vous pouvez échanger le planificateur en faveur de Dask et peut-être obtenir plus d'efficacité sur le partage des données entre les threads, mais tant que le travail que vous faites est lié au processeur comme celui-ci, vous ne pourrez rien faire pour l'accélérer.En bref, c'est un problème d'algorithme: déterminez ce dont vous avez vraiment besoin pour calculer, avant d'essayer de considérer différents cadres pour le calculer.
la source
joblib
générés , qui n'ont rien à voir avec les threads, moins avec le partage? Merci pour votre aimable clarification des arguments.Je ne suis pas un expert
Dask
, mais je fournis le code suivant comme référence:la source