Threading pool similaire au pool multiprocessing?

347

Existe-t-il une classe Pool pour les threads de travail , similaire à la classe Pool du module de multitraitement ?

J'aime par exemple la manière simple de paralléliser une fonction de carte

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

cependant je voudrais le faire sans la surcharge de créer de nouveaux processus.

Je connais le GIL. Cependant, dans mon cas d'utilisation, la fonction sera une fonction C liée aux IO pour laquelle le wrapper python libérera le GIL avant l'appel de la fonction réelle.

Dois-je écrire mon propre pool de threads?

Martin
la source
Voici quelque chose qui semble prometteur dans le livre de recettes
Pool
1
Intégré Aujourd'hui , il est: from multiprocessing.pool import ThreadPool.
martineau
Pouvez-vous nous en dire plus I know about the GIL. However, in my usecase, the function will be an IO-bound C function for which the python wrapper will release the GIL before the actual function call.?
mrgloom

Réponses:

448

Je viens de découvrir qu'il existe en fait une interface de pool basée sur les threads dans le multiprocessingmodule, mais elle est quelque peu cachée et n'est pas correctement documentée.

Il peut être importé via

from multiprocessing.pool import ThreadPool

Il est implémenté à l'aide d'une classe Process factice enveloppant un thread python. Cette classe de processus basée sur les threads peut être trouvée dans multiprocessing.dummylaquelle est brièvement mentionnée dans la documentation . Ce module factice fournit censément l'ensemble de l'interface de multitraitement basée sur les threads.

Martin
la source
5
C'est génial. J'ai eu un problème lors de la création de ThreadPools en dehors du thread principal, vous pouvez les utiliser à partir d'un thread enfant une fois créé. J'ai mis un problème pour cela: bugs.python.org/issue10015
Olson
82
Je ne comprends pas pourquoi cette classe n'a pas de documentation. De telles classes auxiliaires sont si importantes de nos jours.
Wernight
18
@Wernight: il n'est pas public principalement parce que personne n'a proposé un correctif qui le fournit (ou quelque chose de similaire) comme threading.ThreadPool, y compris la documentation et les tests. Ce serait en effet une bonne batterie à inclure dans la bibliothèque standard, mais cela n'arrivera pas si personne ne l'écrit. Un bel avantage de cette implémentation existante dans le multitraitement, c'est qu'elle devrait rendre tout patch de threading beaucoup plus facile à écrire ( docs.python.org/devguide )
ncoghlan
3
@ daniel.gindi: multiprocessing.dummy.Pool/ multiprocessing.pool.ThreadPoolsont la même chose et sont tous deux des pools de threads. Ils imitent l' interface d'un pool de processus, mais ils sont entièrement implémentés en termes de threading. Relisez les documents, vous les avez à l'envers.
ShadowRanger
9
@ daniel.gindi: Lire la suite : " multiprocessing.dummyréplique l'API de multiprocessingmais n'est rien de plus qu'un wrapper autour du threadingmodule." multiprocessingen général, il s'agit de processus, mais pour permettre de basculer entre les processus et les threads, ils ont (principalement) répliqué l' multiprocessingAPI dans multiprocessing.dummy, mais soutenu par des threads, pas des processus. L'objectif est de vous permettre import multiprocessing.dummy as multiprocessingde changer le code basé sur les processus en code basé sur les threads.
ShadowRanger
236

En Python 3, vous pouvez utiliser concurrent.futures.ThreadPoolExecutor, c'est-à-dire:

executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)

Voir la documentation pour plus d'informations et d'exemples.

Adrian Adamiak
la source
6
pour utiliser le module de contrats à terme sudo pip install futures
rétrosportés
c'est le moyen le plus efficace et le plus rapide pour le traitement multiple
Haritsinh Gohil
2
Quelle est la différence entre l'utilisation de ThreadPoolExecutoret multiprocessing.dummy.Pool?
Jay
2
de concurrent.futures import ThreadPoolExecutor
stackOverlord
63

Oui, et il semble avoir (plus ou moins) la même API.

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....
warfares
la source
9
Le chemin d'importation pour ThreadPoolest différent de Pool. L'importation correcte est from multiprocessing.pool import ThreadPool.
Marigold
2
Étrangement, ce n'est pas une API documentée, et multiprocessing.pool n'est mentionné que brièvement comme fournissant AsyncResult. Mais il est disponible en 2.x et 3.x.
Marvin
2
C'est ce que je cherchais. C'est juste une seule ligne d'importation et un petit changement à ma ligne de pool existante et cela fonctionne parfaitement.
Danegraphics
39

Pour quelque chose de très simple et léger (légèrement modifié d' ici ):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()

Pour prendre en charge les rappels à la fin de la tâche, vous pouvez simplement ajouter le rappel au tuple de la tâche.

dgorissen
la source
comment les threads peuvent-ils se joindre s'ils bouclent inconditionnellement à l'infini?
Joseph Garvin
@JosephGarvin Je l'ai testé, et les threads continuent de bloquer sur une file d'attente vide (puisque l'appel à Queue.get()est bloquant) jusqu'à la fin du programme, après quoi ils se terminent automatiquement.
forumulateur
@JosephGarvin, bonne question. Queue.join()va réellement rejoindre la file d'attente des tâches, pas les threads de travail. Ainsi, lorsque la file d'attente est vide, wait_completionretourne, le programme se termine et les threads sont récoltés par le système d'exploitation.
randomir
Si tout ce code est enveloppé dans une fonction soignée, il ne semble pas arrêter les threads même lorsque la file d'attente est vide et pool.wait_completion()revient. Le résultat est que les threads continuent de se construire.
ubiquibacon
17

Salut pour utiliser le pool de threads en Python, vous pouvez utiliser cette bibliothèque:

from multiprocessing.dummy import Pool as ThreadPool

puis pour utilisation, cette bibliothèque fait comme ça:

pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results

Les threads sont le nombre de threads que vous souhaitez et les tâches sont une liste des tâches les plus mappées au service.

Manochehr Rasouli
la source
Merci, c'est une excellente suggestion! À partir de la documentation: multiprocessing.dummy réplique l'API du multiprocessing mais n'est rien de plus qu'un wrapper autour du module de thread. Une correction - je pense que vous voulez dire que l'api de la piscine est (fonction, itérable)
layser
2
Nous avons raté les .close()et .join()appels et que les causes .map()à la fin avant que tous les fils sont terminés. Juste un avertissement.
Anatoly Scherbakov
8

Voici le résultat que j'ai finalement utilisé. C'est une version modifiée des classes par dgorissen ci-dessus.

Fichier: threadpool.py

from queue import Queue, Empty
import threading
from threading import Thread


class Worker(Thread):
    _TIMEOUT = 2
    """ Thread executing tasks from a given tasks queue. Thread is signalable, 
        to exit
    """
    def __init__(self, tasks, th_num):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon, self.th_num = True, th_num
        self.done = threading.Event()
        self.start()

    def run(self):       
        while not self.done.is_set():
            try:
                func, args, kwargs = self.tasks.get(block=True,
                                                   timeout=self._TIMEOUT)
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(e)
                finally:
                    self.tasks.task_done()
            except Empty as e:
                pass
        return

    def signal_exit(self):
        """ Signal to thread to exit """
        self.done.set()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads, tasks=[]):
        self.tasks = Queue(num_threads)
        self.workers = []
        self.done = False
        self._init_workers(num_threads)
        for task in tasks:
            self.tasks.put(task)

    def _init_workers(self, num_threads):
        for i in range(num_threads):
            self.workers.append(Worker(self.tasks, i))

    def add_task(self, func, *args, **kwargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kwargs))

    def _close_all_threads(self):
        """ Signal all threads to exit and lose the references to them """
        for workr in self.workers:
            workr.signal_exit()
        self.workers = []

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def __del__(self):
        self._close_all_threads()


def create_task(func, *args, **kwargs):
    return (func, args, kwargs)

Pour utiliser la piscine

from random import randrange
from time import sleep

delays = [randrange(1, 10) for i in range(30)]

def wait_delay(d):
    print('sleeping for (%d)sec' % d)
    sleep(d)

pool = ThreadPool(20)
for i, d in enumerate(delays):
    pool.add_task(wait_delay, d)
pool.wait_completion()
forumulateur
la source
Annotation pour les autres lecteurs: Ce code est Python 3 (shebang #!/usr/bin/python3)
Daniel Marschall
Pourquoi utilisez-vous for i, d in enumerate(delays):puis ignorez la ivaleur?
martineau
@martineau - probablement juste une relique du développement où ils voulaient probablement imprimer ipendant une course.
n1k31t4
Pourquoi create_tasky a-t-il? Pourquoi est-ce?
MrR
Je ne peux pas croire et répondre avec 4 votes sur SO est la façon de faire ThreadPooling en Python. Le Threadpool dans la distribution officielle de python est toujours cassé? Qu'est-ce que je rate?
MrR
2

Les frais généraux liés à la création de nouveaux processus sont minimes, surtout lorsqu'il ne s'agit que de 4 d'entre eux. Je doute que ce soit un point chaud de performance de votre application. Restez simple, optimisez où vous devez et où les résultats du profilage pointent.

unbeli
la source
5
Si l'interrogateur est sous Windows (ce que je ne pense pas avoir précisé), je pense que la rotation du processus peut être une dépense importante. C'est du moins sur les projets que je fais récemment. :-)
Brandon Rhodes
1

Il n'y a pas de pool basé sur les threads. Cependant, il peut être très rapide d'implémenter une file d'attente producteur / consommateur avec la Queueclasse.

De: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done
Yann Ramin
la source
3
Ce n'est plus le cas avec le concurrent.futuresmodule.
Thanatos
11
Je ne pense plus que ce soit vrai du tout. from multiprocessing.pool import ThreadPool
Randall Hunt
0

une autre façon peut être d'ajouter le processus au pool de files d'attente

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(0, len(list_of_files) - 1):
        a = executor.submit(loop_files2, i, list_of_files2, mt_list, temp_path, mt_dicto)
pelos
la source