Python: Comment puis-je exécuter des fonctions python en parallèle?

109

J'ai fait des recherches en premier et je n'ai pas trouvé de réponse à ma question. J'essaie d'exécuter plusieurs fonctions en parallèle en Python.

J'ai quelque chose comme ça:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

Je veux appeler func1 et func2 et les faire fonctionner en même temps. Les fonctions n'interagissent pas entre elles ou sur le même objet. Pour le moment, je dois attendre la fin de func1 avant de démarrer func2. Comment faire quelque chose comme ci-dessous:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

Je veux être en mesure de créer les deux répertoires assez près en même temps parce que chaque minute, je compte le nombre de fichiers créés. Si le répertoire n'est pas là, cela gâchera mon timing.

lmcadory
la source
1
Vous voudrez peut-être reconcevoir ceci; si vous comptez le nombre de fichiers / dossiers chaque minute, vous créez une condition de concurrence. Qu'en est-il du fait que chaque fonction mette à jour un compteur ou utilise un fichier de verrouillage pour s'assurer que le processus périodique ne met pas à jour le compteur tant que les deux fonctions n'ont pas fini de s'exécuter?

Réponses:

164

Vous pouvez utiliser threadingoumultiprocessing .

En raison des particularités de CPython , il threadingest peu probable d'atteindre un véritable parallélisme. Pour cette raison,multiprocessing est généralement un meilleur pari.

Voici un exemple complet:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

Les mécanismes de démarrage / d'assemblage de processus enfants peuvent facilement être encapsulés dans une fonction le long de votre runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)
NPE
la source
4
J'ai utilisé votre code mais les fonctions n'ont toujours pas démarré en même temps.
lmcadory
4
@Lamar McAdory: Veuillez expliquer ce que vous entendez exactement par «en même temps», en donnant peut-être un exemple concret de ce que vous avez fait, de ce que vous attendiez et de ce qui s'est réellement passé.
NPE
4
@Lamar: Vous ne pouvez jamais avoir aucune garantie de "exactement le même temps" et penser que vous pouvez est tout simplement faux. Selon le nombre de processeurs dont vous disposez, la charge de la machine, le moment où beaucoup de choses se passent sur l'ordinateur auront toutes une influence sur l'heure à laquelle les threads / processus démarrent. De plus, étant donné que les processus sont démarrés juste après la création, la surcharge de création d'un processus doit également être calculée dans le décalage horaire que vous voyez.
Martin
1
est-il possible d'obtenir une liste des résultats de chaque fonction? disons que chaque fonction renvoie une valeur différente, les valeurs peuvent-elles être ajoutées à une liste pouvant être utilisée plus tard? peut-être ajouter le résultat à une liste globale?
pelos
1
Si mes fonctions prennent des paramètres et lorsque je passe des paramètres en les appelant à partir de processus séparés, elles ne s'exécutent pas simultanément. Pouvez-vous s'il vous plaît aider
user2910372
18

Cela peut être fait élégamment avec Ray , un système qui vous permet de paralléliser et de distribuer facilement votre code Python.

Pour paralléliser votre exemple, vous devez définir vos fonctions avec le @ray.remotedécorateur, puis les appeler avec .remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

Si vous passez le même argument aux deux fonctions et que l'argument est volumineux, un moyen plus efficace de le faire consiste à utiliser ray.put(). Cela évite que le gros argument soit sérialisé deux fois et d'en créer deux copies mémoire:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

Si func1()et func2()retournez des résultats, vous devez réécrire le code comme suit:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

L'utilisation de Ray présente un certain nombre d'avantages par rapport au module multiprocesseur . En particulier, le même code s'exécutera sur une seule machine ainsi que sur un cluster de machines. Pour plus d'avantages de Ray, consultez cet article connexe .

Ion Stoica
la source
18

Si vos fonctions font principalement du travail d' E / S (et moins de travail sur le processeur) et que vous avez Python 3.2+, vous pouvez utiliser un ThreadPoolExecutor :

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

Si vos fonctions font principalement du travail sur le processeur (et moins de travail d'E / S) et que vous avez Python 2.6+, vous pouvez utiliser le module multiprocesseur :

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])
David Foster
la source
C'est une bonne réponse. Comment identifier à partir du résultat des tâches liées aux E / S à l'aide de concurrent.futures laquelle a terminé? Fondamentalement, au lieu des fonctions lamba si nous avons des fonctions normales, comment identifier le résultat mappé à la fonction appelée?
Tragaknight
Nevermind j'ai trouvé un moyen - au lieu de ce run_cpu_tasks_in_parallel ([lambda: print ('CPU task 1 running!'), Lambda: print ('CPU task 2 running!'),]) Utiliser ceci - results = run_io_tasks_in_parallel ([lambda: {'is_something1': func1 ()}, lambda: {'is_something2': func2 ()},])
Tragaknight
5

Si vous êtes un utilisateur de Windows et que vous utilisez python 3, cet article vous aidera à faire de la programmation parallèle en python.Lorsque vous exécutez la programmation en pool d'une bibliothèque multiprocesseur habituelle, vous obtiendrez une erreur concernant la fonction principale de votre programme. C'est parce que Windows n'a pas de fonctionnalité fork (). Le post ci-dessous donne une solution au problème mentionné.

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Depuis que j'utilisais le python 3, j'ai changé le programme un peu comme ceci:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

Après cette fonction, le code de problème ci-dessus est également modifié un peu comme ceci:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

Et j'ai eu la sortie comme:

[1, 8, 27, 64, 125, 216]

Je pense que cet article peut être utile pour certains utilisateurs de Windows.

Arun Sooraj
la source
4

Il n'y a aucun moyen de garantir que deux fonctions s'exécuteront en synchronisation l'une avec l'autre, ce qui semble être ce que vous voulez faire.

Le mieux que vous puissiez faire est de diviser la fonction en plusieurs étapes, puis d'attendre que les deux se terminent aux points de synchronisation critiques en utilisant Process.join des mentions de réponse comme @ aix.

C'est mieux que time.sleep(10)parce que vous ne pouvez pas garantir les horaires exacts. En attendant explicitement, vous dites que les fonctions doivent être exécutées en exécutant cette étape avant de passer à la suivante, au lieu de supposer que cela sera fait dans les 10 ms, ce qui n'est pas garanti en fonction de ce qui se passe sur la machine.

Davy8
la source
1

Il semble que vous ayez une seule fonction que vous devez appeler sur deux paramètres différents. Cela peut être fait avec élégance en utilisant une combinaison de concurrent.futureset mapavec Python 3.2+

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

Maintenant, si votre opération est liée aux E / S, vous pouvez utiliser le ThreadPoolExecutorcomme tel:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Notez comment mapest utilisé ici pourmap votre fonction à la liste des arguments.

Maintenant, si votre fonction est liée au processeur, vous pouvez utiliser ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Si vous n'êtes pas sûr, vous pouvez simplement essayer les deux et voir lequel vous donne les meilleurs résultats.

Enfin, si vous souhaitez imprimer vos résultats, vous pouvez simplement le faire:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)
BICube
la source