Comment puis-je utiliser le filetage en Python?

1281

J'essaie de comprendre le filetage en Python. J'ai regardé la documentation et les exemples, mais franchement, de nombreux exemples sont trop sophistiqués et j'ai du mal à les comprendre.

Comment montrez-vous clairement que les tâches sont divisées pour le multithread?

albruno
la source
31
Une bonne discussion générale autour de ce sujet se trouve dans Python's Hardest Problem de Jeff Knupp. En résumé, il semble que le filetage ne soit pas pour les débutants.
Matthew Walker
112
haha, j'ai tendance à penser que le filetage est pour tout le monde, mais les débutants ne sont pas pour le filetage :)))))
Bohdan
42
Juste pour signaler que les gens devraient lire toutes les réponses car les réponses ultérieures sont sans doute meilleures car de nouvelles fonctionnalités de langage sont mises à profit ...
Gwyn Evans
5
N'oubliez pas d'écrire votre logique de base en C et de l'appeler via ctypes pour vraiment profiter du threading Python.
aaa90210
4
Je voulais juste ajouter que PyPubSub est un excellent moyen d'envoyer et de recevoir des messages pour contrôler le flux de thread
ytpillai

Réponses:

1418

Depuis que cette question a été posée en 2010, il y a eu une réelle simplification dans la façon de faire du multithreading simple avec Python avec carte et pool .

Le code ci-dessous provient d'un article / blog que vous devriez absolument vérifier (pas d'affiliation) - Parallélisme en une seule ligne: un meilleur modèle pour les tâches de threading quotidiennes . Je vais résumer ci-dessous - cela finit par n'être que quelques lignes de code:

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

Quelle est la version multithread de:

results = []
for item in my_array:
    results.append(my_function(item))

La description

La carte est une petite fonction sympa et la clé pour injecter facilement du parallélisme dans votre code Python. Pour ceux qui ne sont pas familiers, la carte est quelque chose qui a été retiré des langages fonctionnels comme Lisp. Il s'agit d'une fonction qui mappe une autre fonction sur une séquence.

Map gère l'itération sur la séquence pour nous, applique la fonction et stocke tous les résultats dans une liste pratique à la fin.

Entrez la description de l'image ici


la mise en oeuvre

Les versions parallèles de la fonction de carte sont fournies par deux bibliothèques: le multiprocessing, et aussi son enfant étape peu connu, mais tout aussi fantastique: multiprocessing.dummy.

multiprocessing.dummyest exactement le même que le module multiprocessing, mais utilise des threads à la place ( une distinction importante - utilisez plusieurs processus pour les tâches gourmandes en CPU; threads pour (et pendant) les E / S ):

multiprocessing.dummy réplique l'API du multiprocessing, mais n'est rien de plus qu'un wrapper autour du module de thread.

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org',
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# Make the Pool of workers
pool = ThreadPool(4)

# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

Et les résultats du timing:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

Passer plusieurs arguments (fonctionne comme ça uniquement dans Python 3.3 et versions ultérieures ):

Pour passer plusieurs tableaux:

results = pool.starmap(function, zip(list_a, list_b))

Ou pour passer une constante et un tableau:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

Si vous utilisez une version antérieure de Python, vous pouvez passer plusieurs arguments via cette solution de contournement ).

(Merci à user136036 pour le commentaire utile.)

philshem
la source
90
Il ne manque que des votes car il est tellement fraîchement publié. Cette réponse fonctionne à merveille et démontre la fonctionnalité «carte» qui donne une syntaxe beaucoup plus facile à comprendre que les autres réponses ici.
inactif le
25
S'agit-il même de fils et non de processus? Il semble qu'il tente de multiprocessus! =
Multithread
72
Au fait, les gars, vous pouvez aussi écrire with Pool(8) as p: p.map( *whatever* )et vous débarrasser des lignes de comptabilité.
11
@BarafuAlbino: Aussi utile soit-il, il convient probablement de noter que cela ne fonctionne que dans Python 3.3+ .
fuglede
9
Comment pouvez-vous laisser cette réponse sans mentionner qu'elle n'est utile que pour les opérations d'E / S? Cela ne fonctionne que sur un seul thread qui est inutile dans la plupart des cas, et est en fait plus lent que de le faire normalement
Frobot
714

Voici un exemple simple: vous devez essayer quelques URL alternatives et renvoyer le contenu de la première à répondre.

import Queue
import threading
import urllib2

# Called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

C'est un cas où le threading est utilisé comme une simple optimisation: chaque sous-thread attend une URL pour résoudre et répondre, afin de mettre son contenu dans la file d'attente; chaque thread est un démon (ne poursuivra pas le processus si le thread principal se termine - c'est plus courant qu'improbable); le thread principal démarre tous les sous-threads, fait un getsur la file d'attente pour attendre que l'un d'eux ait fait unput , puis émet les résultats et se termine (ce qui supprime tous les sous-threads qui pourraient encore être en cours d'exécution, car ce sont des threads démon).

L'utilisation correcte des threads en Python est invariablement connectée aux opérations d'E / S (puisque CPython n'utilise pas plusieurs cœurs pour exécuter les tâches liées au CPU de toute façon, la seule raison du threading n'est pas de bloquer le processus pendant qu'il y a une attente pour certaines E / S ). Les files d'attente sont presque toujours le meilleur moyen de regrouper le travail en threads et / ou de collecter les résultats du travail, en passant, et elles sont intrinsèquement threadsafe, de sorte qu'elles vous évitent de vous soucier des verrous, des conditions, des événements, des sémaphores et d'autres inter -filer les concepts de coordination / communication.

Alex Martelli
la source
10
Merci encore, MartelliBot. J'ai mis à jour l'exemple pour attendre que tous les URL répondent: import Queue, threading, urllib2 q = Queue.Queue () urls = '' ' a.com b.com c.com' ''. Split () urls_received = 0 def get_url (q, url): req = urllib2.Request (url) resp = urllib2.urlopen (req) q.put (resp.read ()) global urls_received urls_received + = 1 print urls_received for u in urls: t = threading.Thread (target = get_url, args = (q, u)) t.daemon = True t.start () while q.empty () and urls_received <len (urls): s = q.get () print s
htmldrum
3
@JRM: si vous regardez la réponse suivante ci-dessous, je pense qu'une meilleure façon d'attendre que les threads soient terminés serait d'utiliser la join()méthode, car cela ferait attendre le thread principal jusqu'à ce qu'ils soient terminés sans consommer le processeur en permanence vérification de la valeur. @Alex: merci, c'est exactement ce dont j'avais besoin pour comprendre comment utiliser les threads.
krs013
6
Pour python3, remplacez 'import urllib2' par 'import urllib.request as urllib2'. et mettez des parenthèses dans l'instruction print.
Harvey
5
Pour python 3, remplacez le Queuenom du module par queue. Le nom de la méthode est le même.
JSmyth
2
Je note que la solution imprimera seulement une des pages. Pour imprimer les deux pages à partir de la file d'attente, exécutez simplement la commande à nouveau: s = q.get() print s @ krs013 Vous n'avez pas besoin du joincar Queue.get () bloque.
Tom Anderson
256

REMARQUE : pour la parallélisation réelle en Python, vous devez utiliser le module de multitraitement pour bifurquer plusieurs processus qui s'exécutent en parallèle (en raison du verrouillage de l'interpréteur global, les threads Python fournissent l'entrelacement, mais ils sont en fait exécutés en série, pas en parallèle, et ne sont que utile lors de l'entrelacement d'opérations d'E / S).

Cependant, si vous recherchez simplement un entrelacement (ou effectuez des opérations d'E / S qui peuvent être parallélisées malgré le verrouillage de l'interpréteur global), le module de thread est le point de départ. Comme exemple très simple, considérons le problème de la sommation d'une large plage en sommant les sous-plages en parallèle:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

Notez que ce qui précède est un exemple très stupide, car il ne fait absolument aucune E / S et sera exécuté en série bien qu'entrelacé (avec la surcharge supplémentaire de changement de contexte) dans CPython en raison du verrouillage de l'interpréteur global.

Michael Aaron Safyan
la source
16
@Alex, je n'ai pas dit que c'était pratique, mais cela montre comment définir et générer des threads, ce qui, je pense, est ce que le PO souhaite.
Michael Aaron Safyan
6
Bien que cela montre comment définir et générer des threads, cela ne résume pas les sous-plages en parallèle. thread1s'exécute jusqu'à ce qu'il soit terminé pendant que le thread principal se bloque, puis la même chose se produit thread2, puis le thread principal reprend et imprime les valeurs accumulées.
martineau
N'est-ce pas super(SummingThread, self).__init__()? Comme dans stackoverflow.com/a/2197625/806988
James Andres
@JamesAndres, en supposant que personne n'hérite de "SummingThread", alors l'un ou l'autre fonctionne bien; dans un tel cas, super (SummingThread, self) n'est qu'un moyen sophistiqué de rechercher la classe suivante dans l'ordre de résolution de méthode (MRO), qui est threading.Thread (puis appeler ensuite init sur cela dans les deux cas). Vous avez raison, cependant, en utilisant super () est un meilleur style pour Python actuel. Super était relativement récent au moment où j'ai fourni cette réponse, appelant donc directement à la super classe plutôt que d'utiliser super (). Je vais le mettre à jour pour utiliser super, cependant.
Michael Aaron Safyan
14
AVERTISSEMENT: n'utilisez pas le multithreading dans des tâches comme celle-ci! Comme l'a montré Dave Beazley: dabeaz.com/python/NewGIL.pdf , 2 threads python sur 2 CPU effectuent une tâche lourde CPU 2 fois SLOWER que 1 thread sur 1 CPU et 1.5 fois SLOWER que 2 threads sur 1 CPU. Ce comportement bizarre est dû à une mauvaise coordination des efforts entre OS et Python. Un cas d'utilisation réel pour les threads est une tâche lourde d'E / S. Par exemple, lorsque vous effectuez une lecture / écriture sur le réseau, il est logique de placer un thread, en attendant que les données soient lues / écrites, en arrière-plan et de basculer le CPU sur un autre thread, qui doit traiter les données.
Boris Burkov
98

Comme d'autres l'ont mentionné, CPython peut utiliser des threads uniquement pour les attentes d'E / S en raison de GIL .

Si vous souhaitez bénéficier de plusieurs cœurs pour les tâches liées au processeur, utilisez le multitraitement :

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()
Kai
la source
33
pourriez-vous expliquer un peu ce que cela fait?
pandita
5
@pandita: le code crée un processus, puis le démarre. Alors maintenant, il se passe deux choses en même temps: la ligne principale du programme et le processus qui commence avec la cible, la ffonction. En parallèle, le programme principal n'attend plus que la fin du processus pour l' joinintégrer. Si la partie principale vient de sortir, le sous-processus peut ou non s'exécuter jusqu'à la fin, il joinest donc recommandé de faire un .
johntellsall
1
Une réponse étendue qui inclut la mapfonction est ici: stackoverflow.com/a/28463266/2327328
philshem
2
@philshem Soyez prudent b / c le lien que vous avez publié utilise un pool de threads (pas de processus) comme mentionné ici stackoverflow.com/questions/26432411/… . Cependant, cette réponse utilise un processus. Je suis nouveau dans ce domaine, mais il semble que (en raison de GIL) vous n'obtiendrez des gains de performances que dans des situations spécifiques lors de l'utilisation du multithreading en Python. Cependant, l'utilisation d'un pool de processus peut tirer parti d'un processeur multicœur en faisant travailler plus d'un cœur sur un processus.
user3731622
3
C'est la meilleure réponse pour réellement faire quelque chose d'utile et profiter de plusieurs cœurs de processeur
Frobot
92

Juste une note: une file d'attente n'est pas requise pour le filetage.

Ceci est l'exemple le plus simple que je pourrais imaginer qui montre 10 processus s'exécutant simultanément.

import threading
from random import randint
from time import sleep


def print_number(number):

    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):

    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"
Douglas Adams
la source
3
Ajouter la dernière citation à "Terminé pour le faire imprimer" Terminé "
iChux
1
J'aime mieux cet exemple que Martelli, c'est plus facile de jouer avec. Cependant, je recommanderais à printNumber de faire ce qui suit, pour clarifier un peu ce qui se passe: il devrait enregistrer le randint dans une variable avant de dormir dessus, puis l'impression devrait être modifiée pour dire "Thread" + str ( nombre) + "dormi" + theRandintVariable + "secondes"
Nickolai
Existe-t-il un moyen de savoir quand chaque thread est terminé, comme il se termine?
Matt
1
@Matt Il y a plusieurs façons de faire quelque chose comme ça, mais cela dépendra de vos besoins. Une façon serait de mettre à jour un singleton ou une autre variable accessible au public qui est surveillée dans une boucle while et mise à jour à la fin du thread.
Douglas Adams
2
Pas besoin de deuxième forboucle, vous pouvez appeler thread.start()en première boucle.
Mark Mishyn
49

La réponse d'Alex Martelli m'a aidé. Cependant, voici une version modifiée que je pensais plus utile (au moins pour moi).

Mise à jour: fonctionne à la fois en Python 2 et Python 3

try:
    # For Python 3
    import queue
    from urllib.request import urlopen
except:
    # For Python 2 
    import Queue as queue
    from urllib2 import urlopen

import threading

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

# Load up a queue with your data. This will handle locking
q = queue.Queue()
for url in worker_data:
    q.put(url)

# Define a worker function
def worker(url_queue):
    queue_full = True
    while queue_full:
        try:
            # Get your data off the queue, and do some work
            url = url_queue.get(False)
            data = urlopen(url).read()
            print(len(data))

        except queue.Empty:
            queue_full = False

# Create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()
JimJty
la source
6
Pourquoi ne pas rompre avec l'exception?
Stavros Korokithakis
1
vous pourriez, juste une préférence personnelle
JimJty
1
Je n'ai pas exécuté le code, mais n'avez-vous pas besoin de démonifier les threads? Je pense qu'après cette dernière boucle for, votre programme pourrait se fermer - du moins il le devrait parce que c'est ainsi que les threads devraient fonctionner. Je pense qu'une meilleure approche ne consiste pas à mettre les données des travailleurs dans la file d'attente, mais à placer la sortie dans une file d'attente, car vous pourriez alors avoir une boucle principale qui non seulement gère les informations entrant dans la file d'attente des travailleurs, mais maintenant, elle ne filme pas non plus, et vous savez qu'il ne sortira pas prématurément.
dylnmc
1
@dylnmc, c'est en dehors de mon cas d'utilisation (ma file d'attente d'entrée est prédéfinie). Si vous voulez suivre votre itinéraire, je vous suggère de regarder le céleri
JimJty
@JimJty savez-vous pourquoi j'obtiens cette erreur: import Queue ModuleNotFoundError: No module named 'Queue'j'utilise python 3.6.5 certains articles mentionnent qu'en python 3.6.5 c'est le cas queuemais même après l'avoir changé, cela ne fonctionne toujours pas
user9371654
25

Étant donné une fonction f,, enfilez-la comme ceci:

import threading
threading.Thread(target=f).start()

Pour passer des arguments à f

threading.Thread(target=f, args=(a,b,c)).start()
étoilé
la source
C'est très simple. Comment vous assurez-vous que les fils se ferment lorsque vous en avez terminé avec eux?
cameronroytaylor
Pour autant que je le comprends, lorsque la fonction se termine, l' Threadobjet est nettoyé. Voir les documents . Il existe une is_alive()méthode que vous pouvez utiliser pour vérifier un thread si vous en avez besoin.
Starfry
J'ai vu la is_aliveméthode, mais je n'ai pas pu comprendre comment l'appliquer au fil. J'ai essayé d'affecter thread1=threading.Thread(target=f).start()puis de vérifier avec thread1.is_alive(), mais thread1est rempli None, donc pas de chance là-bas. Savez-vous s'il existe un autre moyen d'accéder au fil?
cameronroytaylor
4
Vous devez affecter l'objet thread à une variable, puis le démarrer à l'aide de cette variable: thread1=threading.Thread(target=f)suivie de thread1.start(). Ensuite, vous pouvez le faire thread1.is_alive().
Starfry
1
Ça a marché. Et oui, tester avec des thread1.is_alive()retours Falsedès que la fonction se termine.
cameronroytaylor
25

J'ai trouvé cela très utile: créez autant de threads que de cœurs et laissez-les exécuter un (grand) nombre de tâches (dans ce cas, appeler un programme shell):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): # Put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        # Execute a task: call a shell program and wait until it completes
        subprocess.call("echo " + str(item), shell=True)
        q.task_done()

cpus = multiprocessing.cpu_count() # Detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() # Block until all tasks are done
dauphin
la source
@shavenwarthog bien sûr, on peut ajuster la variable "cpus" en fonction de ses besoins. Quoi qu'il en soit, l'appel de sous-processus générera des sous-processus et ceux-ci seront alloués aux processeurs par le système d'exploitation (le "processus parent" de python ne signifie pas "même CPU" pour les sous-processus).
dauphin
2
vous avez raison, mon commentaire sur "les threads sont démarrés sur le même CPU que le processus parent" est faux. Merci pour la réponse!
johntellsall
1
peut-être intéressant de noter que contrairement au multithreading qui utilise le même espace mémoire, le multi-traitement ne peut pas partager des variables / données aussi facilement. +1 cependant.
fantastique
22

Python 3 a la possibilité de lancer des tâches parallèles . Cela facilite notre travail.

Il a un pool de threads et un pool de processus .

Ce qui suit donne un aperçu:

Exemple ThreadPoolExecutor ( source )

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor ( source )

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()
Jeril
la source
18

Utilisation du nouveau module concurrent.futures flamboyant

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

L'approche de l'exécuteur peut sembler familière à tous ceux qui se sont déjà salis les mains avec Java auparavant.

Également sur une note secondaire: pour garder l'univers sain d'esprit, n'oubliez pas de fermer vos pools / exécuteurs si vous n'utilisez pas le withcontexte (ce qui est tellement génial qu'il le fait pour vous)

Shubham Chaudhary
la source
17

Pour moi, l'exemple parfait pour le filetage est la surveillance des événements asynchrones. Regardez ce code.

# thread_test.py
import threading
import time

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

Vous pouvez jouer avec ce code en ouvrant une session IPython et en faisant quelque chose comme:

>>> from thread_test import Monitor
>>> a = [0]
>>> mon = Monitor(a)
>>> mon.start()
>>> a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

Attends quelques minutes

>>> a[0] = 2
Mon = 2
dvreed77
la source
1
AttributeError: l'objet 'Monitor' n'a pas d'attribut 'stop'?
pandita
5
N'êtes-vous pas en train de dynamiter les cycles du processeur en attendant que votre événement se produise? Pas toujours une chose très pratique à faire.
nabab
3
Comme le dit magnat, cela sera constamment exécuté. Au minimum, vous pouvez ajouter un sommeil court, par exemple sommeil (0,1), ce qui réduirait probablement considérablement l'utilisation du processeur sur un exemple simple comme celui-ci.
fantastique
3
Ceci est un exemple horrible, gaspillant un noyau. Ajoutez un sommeil à tout le moins, mais la bonne solution consiste à utiliser un mécanisme de signalisation.
PureW
16

La plupart de la documentation et des didacticiels utilisent Python Threadinget le Queuemodule, et ils peuvent sembler écrasants pour les débutants.

Considérons peut-être le concurrent.futures.ThreadPoolExecutormodule de Python 3.

Combiné avec la withcompréhension des clauses et des listes, cela pourrait être un vrai charme.

from concurrent.futures import ThreadPoolExecutor, as_completed

def get_url(url):
    # Your actual program here. Using threading.Lock() if necessary
    return ""

# List of URLs to fetch
urls = ["url1", "url2"]

with ThreadPoolExecutor(max_workers = 5) as executor:

    # Create threads
    futures = {executor.submit(get_url, url) for url in urls}

    # as_completed() gives you the threads once finished
    for f in as_completed(futures):
        # Get the results
        rs = f.result()
Yibo
la source
15

J'ai vu beaucoup d'exemples ici où aucun travail réel n'était effectué, et ils étaient principalement liés au processeur. Voici un exemple de tâche liée au processeur qui calcule tous les nombres premiers entre 10 millions et 10,05 millions. J'ai utilisé les quatre méthodes ici:

import math
import timeit
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def time_stuff(fn):
    """
    Measure time of execution of a function
    """
    def wrapper(*args, **kwargs):
        t0 = timeit.default_timer()
        fn(*args, **kwargs)
        t1 = timeit.default_timer()
        print("{} seconds".format(t1 - t0))
    return wrapper

def find_primes_in(nmin, nmax):
    """
    Compute a list of prime numbers between the given minimum and maximum arguments
    """
    primes = []

    # Loop from minimum to maximum
    for current in range(nmin, nmax + 1):

        # Take the square root of the current number
        sqrt_n = int(math.sqrt(current))
        found = False

        # Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
        for number in range(2, sqrt_n + 1):

            # If divisible we have found a factor, hence this is not a prime number, lets move to the next one
            if current % number == 0:
                found = True
                break

        # If not divisible, add this number to the list of primes that we have found so far
        if not found:
            primes.append(current)

    # I am merely printing the length of the array containing all the primes, but feel free to do what you want
    print(len(primes))

@time_stuff
def sequential_prime_finder(nmin, nmax):
    """
    Use the main process and main thread to compute everything in this case
    """
    find_primes_in(nmin, nmax)

@time_stuff
def threading_prime_finder(nmin, nmax):
    """
    If the minimum is 1000 and the maximum is 2000 and we have four workers,
    1000 - 1250 to worker 1
    1250 - 1500 to worker 2
    1500 - 1750 to worker 3
    1750 - 2000 to worker 4
    so let’s split the minimum and maximum values according to the number of workers
    """
    nrange = nmax - nmin
    threads = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)

        # Start the thread with the minimum and maximum split up to compute
        # Parallel computation will not work here due to the GIL since this is a CPU-bound task
        t = threading.Thread(target = find_primes_in, args = (start, end))
        threads.append(t)
        t.start()

    # Don’t forget to wait for the threads to finish
    for t in threads:
        t.join()

@time_stuff
def processing_prime_finder(nmin, nmax):
    """
    Split the minimum, maximum interval similar to the threading method above, but use processes this time
    """
    nrange = nmax - nmin
    processes = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)
        p = multiprocessing.Process(target = find_primes_in, args = (start, end))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

@time_stuff
def thread_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use a thread pool executor this time.
    This method is slightly faster than using pure threading as the pools manage threads more efficiently.
    This method is still slow due to the GIL limitations since we are doing a CPU-bound task.
    """
    nrange = nmax - nmin
    with ThreadPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

@time_stuff
def process_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use the process pool executor.
    This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations.
    RECOMMENDED METHOD FOR CPU-BOUND TASKS
    """
    nrange = nmax - nmin
    with ProcessPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

def main():
    nmin = int(1e7)
    nmax = int(1.05e7)
    print("Sequential Prime Finder Starting")
    sequential_prime_finder(nmin, nmax)
    print("Threading Prime Finder Starting")
    threading_prime_finder(nmin, nmax)
    print("Processing Prime Finder Starting")
    processing_prime_finder(nmin, nmax)
    print("Thread Executor Prime Finder Starting")
    thread_executor_prime_finder(nmin, nmax)
    print("Process Executor Finder Starting")
    process_executor_prime_finder(nmin, nmax)

main()

Voici les résultats sur ma machine à quatre cœurs Mac OS X

Sequential Prime Finder Starting
9.708213827005238 seconds
Threading Prime Finder Starting
9.81836523200036 seconds
Processing Prime Finder Starting
3.2467174359990167 seconds
Thread Executor Prime Finder Starting
10.228896902000997 seconds
Process Executor Finder Starting
2.656402041000547 seconds
PirateApp
la source
1
@TheUnfunCat aucun exécuteur de processus n'est bien meilleur que le threading pour les tâches liées au processeur
PirateApp
1
Grande réponse mec. Je peux confirmer qu'en Python 3.6 sur Windows (au moins) ThreadPoolExecutor ne fait rien de bon pour les tâches gourmandes en CPU. Il n'utilise pas de cœurs pour le calcul. Alors que ProcessPoolExecutor copie les données dans TOUS les processus qu'il génère, c'est mortel pour les grandes matrices.
Anatoly Alekseev
1
Exemple très utile, mais je ne comprends pas comment cela a fonctionné. Nous avons besoin d' un if __name__ == '__main__':avant l'appel principal, sinon les pontes de mesure lui - même et imprime une tentative a été faite pour commencer un nouveau processus avant ... .
Stein
1
@Stein Je pense que ce n'est qu'un problème sous Windows, cependant.
AMC
12

Voici l'exemple très simple de l' importation CSV en utilisant le filetage. (L'inclusion dans la bibliothèque peut différer pour différents objectifs.)

Fonctions d'assistance:

from threading import Thread
from project import app
import csv


def import_handler(csv_file_name):
    thr = Thread(target=dump_async_csv_data, args=[csv_file_name])
    thr.start()

def dump_async_csv_data(csv_file_name):
    with app.app_context():
        with open(csv_file_name) as File:
            reader = csv.DictReader(File)
            for row in reader:
                # DB operation/query

Fonction pilote:

import_handler(csv_file_name)
Chirag Vora
la source
9

Je voudrais contribuer avec un exemple simple et les explications que j'ai trouvées utiles lorsque j'ai dû résoudre ce problème moi-même.

Dans cette réponse, vous trouverez des informations sur le GIL de Python (verrouillage de l'interpréteur global) et un exemple simple écrit au jour le jour à l'aide de multiprocessing.dummy ainsi que quelques tests de référence simples.

Verrou d'interprète global (GIL)

Python n'autorise pas le multi-threading dans le vrai sens du mot. Il a un package multi-thread, mais si vous voulez multi-thread pour accélérer votre code, ce n'est généralement pas une bonne idée de l'utiliser.

Python a une construction appelée le verrou d'interpréteur global (GIL). Le GIL s'assure qu'un seul de vos «threads» peut s'exécuter à la fois. Un thread acquiert le GIL, fait un petit travail, puis passe le GIL sur le thread suivant.

Cela se produit très rapidement, donc à l'œil humain, il peut sembler que vos threads s'exécutent en parallèle, mais ils se tournent vraiment à tour de rôle en utilisant le même cœur de processeur.

Tout ce passage GIL ajoute des frais généraux à l'exécution. Cela signifie que si vous souhaitez accélérer l'exécution de votre code, l'utilisation du package de thread n'est souvent pas une bonne idée.

Il y a des raisons d'utiliser le package de threads de Python. Si vous voulez exécuter certaines choses simultanément et que l'efficacité n'est pas un problème, c'est tout à fait correct et pratique. Ou si vous exécutez du code qui doit attendre quelque chose (comme des E / S), cela peut avoir beaucoup de sens. Mais la bibliothèque de threads ne vous permettra pas d'utiliser des cœurs de processeur supplémentaires.

Le multi-threading peut être externalisé vers le système d'exploitation (en effectuant le multi-traitement) et une application externe qui appelle votre code Python (par exemple, Spark ou Hadoop ), ou du code que votre code Python appelle (par exemple: vous pourriez demandez à votre code Python d'appeler une fonction C qui fait les choses coûteuses multi-thread).

Pourquoi c'est important

Parce que beaucoup de gens passent beaucoup de temps à essayer de trouver des goulots d'étranglement dans leur code multi-thread Python avant d'apprendre ce qu'est le GIL.

Une fois ces informations claires, voici mon code:

#!/bin/python
from multiprocessing.dummy import Pool
from subprocess import PIPE,Popen
import time
import os

# In the variable pool_size we define the "parallelness".
# For CPU-bound tasks, it doesn't make sense to create more Pool processes
# than you have cores to run them on.
#
# On the other hand, if you are using I/O-bound tasks, it may make sense
# to create a quite a few more Pool processes than cores, since the processes
# will probably spend most their time blocked (waiting for I/O to complete).
pool_size = 8

def do_ping(ip):
    if os.name == 'nt':
        print ("Using Windows Ping to " + ip)
        proc = Popen(['ping', ip], stdout=PIPE)
        return proc.communicate()[0]
    else:
        print ("Using Linux / Unix Ping to " + ip)
        proc = Popen(['ping', ip, '-c', '4'], stdout=PIPE)
        return proc.communicate()[0]


os.system('cls' if os.name=='nt' else 'clear')
print ("Running using threads\n")
start_time = time.time()
pool = Pool(pool_size)
website_names = ["www.google.com","www.facebook.com","www.pinterest.com","www.microsoft.com"]
result = {}
for website_name in website_names:
    result[website_name] = pool.apply_async(do_ping, args=(website_name,))
pool.close()
pool.join()
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Now we do the same without threading, just to compare time
print ("\nRunning NOT using threads\n")
start_time = time.time()
for website_name in website_names:
    do_ping(website_name)
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Here's one way to print the final output from the threads
output = {}
for key, value in result.items():
    output[key] = value.get()
print ("\nOutput aggregated in a Dictionary:")
print (output)
print ("\n")

print ("\nPretty printed output: ")
for key, value in output.items():
    print (key + "\n")
    print (value)
Pitto
la source
7

Voici le multi threading avec un exemple simple qui vous sera utile. Vous pouvez l'exécuter et comprendre facilement le fonctionnement du multi-threading en Python. J'ai utilisé un verrou pour empêcher l'accès aux autres threads jusqu'à ce que les threads précédents aient terminé leur travail. Par l'utilisation de cette ligne de code,

tLock = threading.BoundedSemaphore (valeur = 4)

vous pouvez autoriser un certain nombre de processus à la fois et conserver le reste des threads qui s'exécuteront plus tard ou après la fin des processus précédents.

import threading
import time

#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
    print  "\r\nTimer: ", name, " Started"
    tLock.acquire()
    print "\r\n", name, " has the acquired the lock"
    while repeat > 0:
        time.sleep(delay)
        print "\r\n", name, ": ", str(time.ctime(time.time()))
        repeat -= 1

    print "\r\n", name, " is releaseing the lock"
    tLock.release()
    print "\r\nTimer: ", name, " Completed"

def Main():
    t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
    t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
    t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
    t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
    t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))

    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    print "\r\nMain Complete"

if __name__ == "__main__":
    Main()
cSharma
la source
5

En empruntant à ce poste, nous savons comment choisir entre le multithreading, le multiprocessing et l'async / asyncioet leur utilisation.

Python 3 a une nouvelle bibliothèque intégrée pour concurrencer et paralléliser: concurrent.futures

Je vais donc démontrer à travers une expérience pour exécuter quatre tâches (c'est-à-dire la .sleep()méthode) de Threading-Poolmanière:

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time

def concurrent(max_worker=1):
    futures = []

    tick = time()
    with ThreadPoolExecutor(max_workers=max_worker) as executor:
        futures.append(executor.submit(sleep, 2))  # Two seconds sleep
        futures.append(executor.submit(sleep, 1))
        futures.append(executor.submit(sleep, 7))
        futures.append(executor.submit(sleep, 3))

        for future in as_completed(futures):
            if future.result() is not None:
                print(future.result())

    print('Total elapsed time by {} workers:'.format(max_worker), time()-tick)

concurrent(5)
concurrent(4)
concurrent(3)
concurrent(2)
concurrent(1)

Production:

Total elapsed time by 5 workers: 7.007831811904907
Total elapsed time by 4 workers: 7.007944107055664
Total elapsed time by 3 workers: 7.003149509429932
Total elapsed time by 2 workers: 8.004627466201782
Total elapsed time by 1 workers: 13.013478994369507

[ REMARQUE ]:

  • Comme vous pouvez le voir dans les résultats ci-dessus, le meilleur cas était de 3 travailleurs pour ces quatre tâches.
  • Si vous avez une tâche de processus au lieu de la liaison ou du blocage des E / S ( multiprocessingvs threading), vous pouvez changer la ThreadPoolExecutoren ProcessPoolExecutor.
Benyamin Jafari
la source
4

Aucune des solutions précédentes n'utilisait réellement plusieurs cœurs sur mon serveur GNU / Linux (où je n'ai pas de droits d'administrateur). Ils ont simplement fonctionné sur un seul cœur.

J'ai utilisé l' os.forkinterface de niveau inférieur pour générer plusieurs processus. Voici le code qui a fonctionné pour moi:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break
David Schumann
la source
2
import threading
import requests

def send():

  r = requests.get('https://www.stackoverlow.com')

thread = []
t = threading.Thread(target=send())
thread.append(t)
t.start()
Skiller Dz
la source
1
@sP_ J'imagine parce qu'alors vous avez des objets thread afin que vous puissiez attendre qu'ils se terminent.
Aleksandar Makragić
1
t = threading.Thread (target = send ()) devrait être t = threading.Thread (target = send)
TRiNE
Je dévalise cette réponse car elle ne fournit pas d'explication sur la façon dont elle améliore les réponses existantes, en plus de contenir une grave inexactitude.
Jules