Comment paralléliser une simple boucle Python?

256

C'est probablement une question triviale, mais comment paralléliser la boucle suivante en python?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

Je sais comment démarrer des threads uniques en Python mais je ne sais pas comment "collecter" les résultats.

Plusieurs processus seraient également très bien - ce qui est le plus simple dans ce cas. J'utilise actuellement Linux, mais le code devrait également fonctionner sur Windows et Mac.

Quelle est la façon la plus simple de paralléliser ce code?

moi-même
la source

Réponses:

192

L'utilisation de plusieurs threads sur CPython ne vous donnera pas de meilleures performances pour le code pur Python en raison du verrou d'interpréteur global (GIL). Je suggère d'utiliser le multiprocessingmodule à la place:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Notez que cela ne fonctionnera pas dans l'interpréteur interactif.

Pour éviter le FUD habituel autour du GIL: Il n'y aurait de toute façon aucun avantage à utiliser des threads pour cet exemple. Vous voulez utiliser des processus ici, pas des threads, car ils évitent tout un tas de problèmes.

Sven Marnach
la source
47
Puisque c'est la réponse choisie, est-il possible d'avoir un exemple plus complet? Quels sont les arguments de calc_stuff?
Eduardo Pignatelli
2
@EduardoPignatelli Veuillez simplement lire la documentation du multiprocessingmodule pour des exemples plus complets. Pool.map()fonctionne essentiellement comme map(), mais en parallèle.
Sven Marnach
3
Existe-t-il un moyen d'ajouter simplement une barre de chargement tqdm à cette structure de code? J'ai utilisé tqdm (pool.imap (calc_stuff, range (0, 10 * offset, offset))) mais je n'ai pas de graphique de barre de chargement complet.
user8188120
@ user8188120 Je n'ai jamais entendu parler de tqdm auparavant, donc désolé, je ne peux pas m'en empêcher.
Sven Marnach
Pour une barre de chargement tqdm, voir cette question: stackoverflow.com/questions/41920124/…
Johannes
67

Pour paralléliser une boucle for simple, joblib apporte beaucoup de valeur à l'utilisation brute du multitraitement. Non seulement la syntaxe courte, mais aussi des choses comme le regroupement transparent des itérations lorsqu'elles sont très rapides (pour supprimer la surcharge) ou la capture du traçage du processus enfant, pour avoir un meilleur rapport d'erreurs.

Avertissement: je suis l'auteur original de joblib.

Gael Varoquaux
la source
1
J'ai essayé joblib avec jupyter, ça ne marche pas. Après l'appel différé en parallèle, la page a cessé de fonctionner.
Jie
1
Salut, j'ai un problème avec joblib ( stackoverflow.com/questions/52166572/… ), avez-vous une idée de la cause? Merci beaucoup.
Ting Sun
On dirait quelque chose que je veux essayer! Est-il possible de l'utiliser avec une double boucle par exemple pour i dans la plage (10): pour j dans la plage (20)
CutePoison
51

Quelle est la façon la plus simple de paralléliser ce code?

J'aime beaucoup concurrent.futurespour cela, disponible en Python3 depuis la version 3.2 - et via backport vers 2.6 et 2.7 sur PyPi .

Vous pouvez utiliser des threads ou des processus et utiliser exactement la même interface.

Multiprocessing

Mettez ceci dans un fichier - futuretest.py:

import concurrent.futures
import time, random               # add some random sleep time

offset = 2                        # you don't supply these so
def calc_stuff(parameter=None):   # these are examples.
    sleep_time = random.choice([0, 1, 2, 3, 4, 5])
    time.sleep(sleep_time)
    return parameter / 2, sleep_time, parameter * parameter

def procedure(j):                 # just factoring out the
    parameter = j * offset        # procedure
    # call the calculation
    return calc_stuff(parameter=parameter)

def main():
    output1 = list()
    output2 = list()
    output3 = list()
    start = time.time()           # let's see how long this takes

    # we can swap out ProcessPoolExecutor for ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for out1, out2, out3 in executor.map(procedure, range(0, 10)):
            # put results into correct output list
            output1.append(out1)
            output2.append(out2)
            output3.append(out3)
    finish = time.time()
    # these kinds of format strings are only available on Python 3.6:
    # time to upgrade!
    print(f'original inputs: {repr(output1)}')
    print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
    print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
    print(f'returned in order given: {repr(output3)}')

if __name__ == '__main__':
    main()

Et voici la sortie:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Multithreading

Maintenant , changez ProcessPoolExecutorà ThreadPoolExecutor, et exécutez à nouveau le module:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Vous avez maintenant effectué à la fois le multithreading et le multitraitement!

Remarque sur les performances et l'utilisation des deux ensemble.

L'échantillonnage est beaucoup trop petit pour comparer les résultats.

Cependant, je soupçonne que le multithreading sera plus rapide que le multitraitement en général, en particulier sous Windows, car Windows ne prend pas en charge le forking, donc chaque nouveau processus doit prendre du temps pour se lancer. Sur Linux ou Mac, ils seront probablement plus proches.

Vous pouvez imbriquer plusieurs threads dans plusieurs processus, mais il est recommandé de ne pas utiliser plusieurs threads pour dériver plusieurs processus.

Aaron Hall
la source
ThreadPoolExecutor contourne-t-il les limitations imposées par GIL? aussi vous ne seriez pas besoin de se joindre () afin d'attendre les exécuteurs pour terminer ou est - ce soin apporté implicitement de l' intérieur du gestionnaire de contexte
PirateApp
1
Non et non, oui à "géré implicitement"
Aaron Hall
Pour une raison quelconque, lors de l'extension du problème, le multithreading est extrêmement rapide, mais le multitraitement engendre un tas de processus bloqués (sous macOS). Une idée pourquoi cela pourrait être? Le processus ne contient que des boucles imbriquées et des mathématiques, rien d'exotique.
komodovaran_
@komodovaran_ Un processus est un processus Python complet, un par chacun, tandis qu'un thread est juste un thread d'exécution avec sa propre pile qui partage le processus, son bytecode et tout ce qu'il a en mémoire avec tous les autres threads - cela aide-t-il ?
Aaron Hall
49
from joblib import Parallel, delayed
import multiprocessing

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)

Ce qui précède fonctionne à merveille sur ma machine (Ubuntu, le paquet joblib a été préinstallé, mais peut être installé via pip install joblib).

Tiré de https://blog.dominodatalab.com/simple-parallelization/

Tyrex
la source
3
J'ai essayé votre code mais sur mon système, la version séquentielle de ce code prend environ une demi-minute et la version parallèle ci-dessus prend 4 minutes. Pourquoi
shaifali Gupta
3
Merci pour votre réponse! Je pense que c'est la façon la plus élégante de le faire en 2019.
Heikki Pulkkinen
2
le multiprocessing n'est pas valide pour Python 3.x, donc cela ne fonctionne pas pour moi.
EngrStudent
2
@EngrStudent Vous ne savez pas ce que vous entendez par "non valide". Cela fonctionne pour Python 3.6.x pour moi.
Tyrex
@tyrex merci pour le partage! ce paquet joblib est génial et l'exemple fonctionne pour moi. Cependant, dans un contexte plus complexe, j'ai malheureusement eu un bug. github.com/joblib/joblib/issues/949
Open Food Broker
13

L'utilisation de Ray présente plusieurs avantages :

  • Vous pouvez paralléliser plusieurs machines en plus de plusieurs cœurs (avec le même code).
  • Gestion efficace des données numériques via la mémoire partagée (et la sérialisation sans copie).
  • Débit de tâches élevé avec planification distribuée.
  • Tolérance aux pannes.

Dans votre cas, vous pouvez démarrer Ray et définir une fonction à distance

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

puis l'invoquer en parallèle

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

Pour exécuter le même exemple sur un cluster, la seule ligne qui changerait serait l'appel à ray.init (). La documentation pertinente peut être trouvée ici .

Notez que j'aide à développer Ray.

Robert Nishihara
la source
1
Pour tous ceux qui envisagent d'utiliser Ray, il peut être pertinent de savoir qu'il ne prend pas en charge Windows nativement. Certains hacks pour le faire fonctionner dans Windows en utilisant WSL (Windows Subsystem for Linux) sont possibles, bien que ce ne soit pas prêt à l'emploi si vous souhaitez utiliser Windows.
OscarVanL
9

C'est la façon la plus simple de le faire!

Vous pouvez utiliser asyncio . (La documentation peut être trouvée ici ). Il est utilisé comme base pour plusieurs cadres asynchrones Python qui fournissent des serveurs réseau et Web hautes performances, des bibliothèques de connexion à la base de données, des files d'attente de tâches distribuées, etc. .

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

Maintenant, cette fonction sera exécutée en parallèle à chaque appel sans mettre le programme principal en état d'attente. Vous pouvez également l'utiliser pour paralléliser la boucle. Lorsqu'il est appelé pour une boucle for, bien que la boucle soit séquentielle, mais chaque itération s'exécute en parallèle avec le programme principal dès que l'interpréteur y arrive. Par exemple:

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')

Cela produit la sortie suivante:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1
Utilisateur5
la source
Je pense qu'il y a une faute de frappe wrapped()et qu'elle devrait être **kwargsau lieu de*kwargs
jakub-olczyk
Oups! Mon erreur. Corrigée!
User5
6

pourquoi n'utilisez-vous pas de threads et un mutex pour protéger une liste globale?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

gardez à l'esprit, vous serez aussi rapide que votre fil le plus lent

jackdoe
la source
2
Je sais que c'est une très vieille réponse, donc c'est une déception d'obtenir un downvote aléatoire de nulle part. Je n'ai voté que parce que les threads ne paralléliseront rien. Les threads en Python sont liés à un seul thread s'exécutant sur l'interpréteur à la fois en raison du verrouillage global de l'interpréteur, ils prennent donc en charge la programmation simultanée, mais pas en parallèle comme OP le demande.
skrrgwasme
3
@skrrgwasme Je sais que vous le savez, mais lorsque vous utilisez les mots "ils ne parallèleront rien", cela pourrait induire les lecteurs en erreur. Si les opérations prennent du temps parce qu'elles sont liées aux entrées-sorties ou qu'elles dorment en attendant un événement, alors l'interpréteur est libéré pour exécuter les autres threads, ce qui entraînera l'augmentation de vitesse que les gens espèrent dans ces cas. Seuls les threads liés au CPU sont vraiment affectés par ce que dit skrrgwasme.
Jonathan Hartley
5

J'ai trouvé joblibest très utile avec moi. Veuillez voir l'exemple suivant:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs = -1: utiliser tous les cœurs disponibles

miuxu
la source
14
Vous savez, il est préférable de vérifier les réponses déjà existantes avant de poster les vôtres. Cette réponse propose également d'utiliser joblib.
sanyash
2

Disons que nous avons une fonction asynchrone

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

Cela doit être exécuté sur un grand tableau. Certains attributs sont transmis au programme et certains sont utilisés à partir de la propriété de l'élément de dictionnaire dans le tableau.

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))
Amit Teli
la source
1

Jetez un oeil à ceci;

http://docs.python.org/library/queue.html

Ce n'est peut-être pas la bonne façon de le faire, mais je ferais quelque chose comme;

Code réel;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

J'espère que cela pourra aider.

MerreM
la source
1

Cela pourrait être utile lors de l'implémentation du multitraitement et de l'informatique parallèle / distribuée en Python.

Tutoriel YouTube sur l'utilisation du package Techila

Techila est un middleware informatique distribué, qui s'intègre directement à Python à l'aide du package techila. La fonction pêche dans le package peut être utile pour paralléliser des structures de boucles. (L'extrait de code suivant provient des forums de la communauté Techila )

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )
Tee
la source
1
Bien que ce lien puisse répondre à la question, il est préférable d'inclure les parties essentielles de la réponse ici et de fournir le lien de référence. Les réponses de lien uniquement peuvent devenir invalides si la page liée change.
SL Barth - Rétablir Monica
2
@SLBarth merci pour les commentaires. J'ai ajouté un petit exemple de code à la réponse.
TEe
1

merci @iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'
Felipe de Macêdo
la source
2
-1. Ceci est une réponse en code uniquement. Je suggère d'ajouter une explication qui indique aux lecteurs ce que fait le code que vous avez publié, et peut-être où ils peuvent trouver des informations supplémentaires.
starbeamrainbowlabs
-1

exemple très simple de traitement parallèle est

from multiprocessing import Process

output1 = list()
output2 = list()
output3 = list()

def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)

if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()
Adil Warsi
la source
3
Il n'y a pas de parallélisme dans la boucle for ici, vous générez simplement un processus qui exécute la boucle entière; ce n'est PAS ce que le PO voulait.
facuq