Comment faire de la programmation parallèle en Python?

141

Pour C ++, nous pouvons utiliser OpenMP pour faire de la programmation parallèle; cependant, OpenMP ne fonctionnera pas pour Python. Que dois-je faire si je souhaite mettre en parallèle certaines parties de mon programme python?

La structure du code peut être considérée comme:

solve1(A)
solve2(B)

solve1et solve2sont deux fonctions indépendantes. Comment exécuter ce type de code en parallèle plutôt qu'en séquence afin de réduire le temps d'exécution? J'espère que quelqu'un pourra m'aider. Merci d'avance. Le code est:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break

        node1 = partition[0]
        node2 = partition[1]

        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

Où setinner et setouter sont deux fonctions indépendantes. C'est là que je veux mettre en parallèle ...

ilovecp3
la source
31
Jetez un œil au multitraitement . Remarque: les threads de Python ne conviennent pas aux tâches liées au processeur, uniquement aux tâches liées aux E / S.
9000
4
@ 9000 +100 internets pour mentionner les tâches dépendantes du CPU et des E / S.
Hyperboreus
@ 9000 En fait, les threads ne conviennent pas du tout aux tâches liées au processeur pour autant que je sache! Les processus sont la voie à suivre pour effectuer de véritables tâches liées au processeur.
Omar Al-Ithawi
6
@OmarIthawi: pourquoi, les threads fonctionnent bien si vous avez beaucoup de cœurs de processeur (comme d'habitude maintenant). Ensuite, votre processus peut exécuter plusieurs threads chargeant tous ces cœurs en parallèle et partageant implicitement des données communes entre eux (c'est-à-dire sans avoir de zone de mémoire partagée explicite ou de messagerie inter-processus).
9000
1
@ user2134774: Eh bien, oui, mon deuxième commentaire n'a guère de sens. Probablement les seules extensions C qui libèrent le GIL peuvent en bénéficier; par exemple, certaines parties de NumPy et Pandas le font. Dans d'autres cas, c'est faux (mais je ne peux pas le modifier maintenant).
9000 le

Réponses:

162

Vous pouvez utiliser le module multitraitement . Dans ce cas, je pourrais utiliser un pool de traitement:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

Cela engendrera des processus qui peuvent effectuer un travail générique pour vous. Comme nous n'avons pas réussi processes, il engendrera un processus pour chaque cœur de processeur de votre machine. Chaque cœur de processeur peut exécuter un processus simultanément.

Si vous souhaitez mapper une liste à une seule fonction, procédez comme suit:

args = [A, B]
results = pool.map(solve1, args)

N'utilisez pas de threads car le GIL verrouille toutes les opérations sur les objets python.

Matt Williamson
la source
1
pool.mapn'accepte aussi les dictionnaires comme args? Ou seulement de simples listes?
The Bndr
Juste des listes je pense. Mais vous pouvez simplement passer dans dict.items () qui sera une liste de tuples de valeur clé
Matt Williamson
Malheureusement, cela se termine par une erreur `` type unhashable: 'list' '
The Bndr
en plus de mon dernier commentaire: `dict.items ()` work. L'erreur se pose, car j'ai dû changer la gestion de la variable insight le processus-funktion. Malheureusement, le message d'erreur n'a pas été très utile ... Alors: merci pour votre indice. :-)
The Bndr
2
Quel est le délai d'expiration ici?
gamma du
26

Cela peut être fait très élégamment avec Ray .

Pour paralléliser votre exemple, vous devez définir vos fonctions avec le @ray.remotedécorateur, puis les appeler avec .remote.

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

Il y a un certain nombre d'avantages à cela par rapport au module multiprocesseur .

  1. Le même code s'exécutera sur une machine multicœur ainsi que sur un cluster de machines.
  2. Les processus partagent efficacement les données grâce à la mémoire partagée et à la sérialisation sans copie .
  3. Les messages d'erreur sont bien propagés.
  4. Ces appels de fonction peuvent être composés ensemble, par exemple,

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
  5. En plus d'appeler des fonctions à distance, les classes peuvent être instanciées à distance en tant qu'acteurs .

Notez que Ray est un framework que j'ai aidé à développer.

Robert Nishihara
la source
je reçois toujours une erreur qui dit "Impossible de trouver une version qui répond à l'exigence ray (à partir des versions
:)
2
Habituellement, ce type d'erreur signifie que vous devez mettre à niveau pip. Je suggérerais d'essayer pip install --upgrade pip. Si vous devez utiliser sudodu tout, il est possible que la version de pipcelle que vous utilisez pour installer rayne soit pas la même que celle qui est mise à niveau. Vous pouvez vérifier avec pip --version. De plus, Windows n'est actuellement pas pris en charge, donc si vous êtes sous Windows, c'est probablement le problème.
Robert Nishihara
1
Juste une note, c'est principalement pour distribuer des travaux simultanés sur plusieurs machines.
Matt Williamson
2
Il est en fait optimisé à la fois pour le cas d'une seule machine et pour le paramètre de cluster. Un grand nombre des décisions de conception (par exemple, la mémoire partagée, la sérialisation sans copie) visent à bien prendre en charge des machines uniques.
Robert Nishihara
2
Ce serait formidable si la documentation le soulignait davantage. J'ai eu le sentiment en lisant les documents qu'il n'était pas vraiment destiné au cas d'une seule machine.
Sledge
4

La solution, comme d'autres l'ont dit, consiste à utiliser plusieurs processus. Le cadre le plus approprié dépend cependant de nombreux facteurs. En plus de ceux déjà mentionnés, il y a aussi charm4py et mpi4py (je suis le développeur de charm4py).

Il existe un moyen plus efficace d'implémenter l'exemple ci-dessus que d'utiliser l'abstraction du pool de nœuds de calcul. La boucle principale envoie les mêmes paramètres (y compris le graphique complet G) à plusieurs reprises aux travailleurs dans chacune des 1000 itérations. Étant donné qu'au moins un travailleur résidera sur un processus différent, cela implique de copier et d'envoyer les arguments aux autres processus. Cela peut être très coûteux selon la taille des objets. Au lieu de cela, il est logique que les travailleurs stockent l'état et envoient simplement les informations mises à jour.

Par exemple, dans charm4py, cela peut être fait comme ceci:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

Notez que pour cet exemple, nous n'avons vraiment besoin que d'un seul travailleur. La boucle principale pourrait exécuter l'une des fonctions et demander au travailleur d'exécuter l'autre. Mais mon code aide à illustrer plusieurs choses:

  1. L'ouvrier A s'exécute dans le processus 0 (identique à la boucle principale). Pendant que l' result_a.get()attente du résultat est bloquée, le travailleur A effectue le calcul dans le même processus.
  2. Les arguments sont automatiquement passés par référence au travailleur A, car il est dans le même processus (aucune copie n'est impliquée).
Juan Galvez
la source
2

Dans certains cas, il est possible de paralléliser automatiquement les boucles à l'aide de Numba , bien que cela ne fonctionne qu'avec un petit sous-ensemble de Python:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

Malheureusement, il semble que Numba ne fonctionne qu'avec des tableaux Numpy, mais pas avec d'autres objets Python. En théorie, il pourrait également être possible de compiler Python en C ++ , puis de le paralléliser automatiquement à l'aide du compilateur Intel C ++ , même si je n'ai pas encore essayé cela.

Vert Anderson
la source
2

Vous pouvez utiliser la joblibbibliothèque pour effectuer des calculs parallèles et des multiprocesseurs.

from joblib import Parallel, delayed

Vous pouvez simplement créer une fonction fooque vous souhaitez exécuter en parallèle et basée sur le morceau de code suivant, implémenter le traitement parallèle:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

num_corespeut être obtenu de la multiprocessingbibliothèque comme suit:

import multiprocessing

num_cores = multiprocessing.cpu_count()

Si vous avez une fonction avec plus d'un argument d'entrée et que vous souhaitez simplement parcourir l'un des arguments par une liste, vous pouvez utiliser la partialfonction de la functoolsbibliothèque comme suit:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

Vous pouvez trouver une explication complète du multitraitement python et R avec quelques exemples ici .

vahab najari
la source