J'ai beaucoup de mal à comprendre comment la file d'attente multiprocesseur fonctionne sur python et comment l'implémenter. Disons que j'ai deux modules python qui accèdent aux données à partir d'un fichier partagé, appelons ces deux modules un écrivain et un lecteur. Mon plan est que le lecteur et l'écrivain placent les requêtes dans deux files d'attente multiprocesseurs séparées, puis qu'un troisième processus affiche ces requêtes en boucle et les exécute en tant que telles.
Mon principal problème est que je ne sais vraiment pas comment implémenter correctement multiprocessing.queue, vous ne pouvez pas vraiment instancier l'objet pour chaque processus car ce seront des files d'attente séparées, comment vous assurer que tous les processus sont liés à une file d'attente partagée (ou dans ce cas, les files d'attente)
Réponses:
Ceci est un exemple simple de lecteur et d'écrivain partageant une seule file d'attente ... L'écrivain envoie un tas d'entiers au lecteur; lorsque l'écrivain est à court de nombres, il envoie «DONE», qui permet au lecteur de savoir qu'il doit sortir de la boucle de lecture.
from multiprocessing import Process, Queue import time import sys def reader_proc(queue): ## Read from the queue; this will be spawned as a separate Process while True: msg = queue.get() # Read from the queue and do nothing if (msg == 'DONE'): break def writer(count, queue): ## Write to the queue for ii in range(0, count): queue.put(ii) # Write 'count' numbers into the queue queue.put('DONE') if __name__=='__main__': pqueue = Queue() # writer() writes to pqueue from _this_ process for count in [10**4, 10**5, 10**6]: ### reader_proc() reads from pqueue as a separate process reader_p = Process(target=reader_proc, args=((pqueue),)) reader_p.daemon = True reader_p.start() # Launch reader_proc() as a separate python process _start = time.time() writer(count, pqueue) # Send a lot of stuff to reader() reader_p.join() # Wait for the reader to finish print("Sending {0} numbers to Queue() took {1} seconds".format(count, (time.time() - _start)))
la source
dans "
from queue import Queue
" il n'y a pas de module appeléqueue
, à la placemultiprocessing
devrait être utilisé. Par conséquent, il devrait ressembler à "from multiprocessing import Queue
"la source
multiprocessing.Queue
est correcte. La normaleQueue.Queue
est utilisée pour les threads python . Lorsque vous essayez d'utiliserQueue.Queue
avec le multitraitement, des copies de l'objet Queue seront créées dans chaque processus enfant et les processus enfants ne seront jamais mis à jour. Fondamentalement,Queue.Queue
fonctionne à l'aide d'un objet partagé global etmultiprocessing.Queue
fonctionne à l'aide d'IPC. Voir: stackoverflow.com/questions/925100/…Voici une utilisation très simple de
multiprocessing.Queue
etmultiprocessing.Process
qui permet aux appelants d'envoyer un "événement" plus des arguments à un processus séparé qui distribue l'événement à une méthode "do_" sur le processus. (Python 3.4+)import multiprocessing as mp import collections Msg = collections.namedtuple('Msg', ['event', 'args']) class BaseProcess(mp.Process): """A process backed by an internal queue for simple one-way message passing. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.queue = mp.Queue() def send(self, event, *args): """Puts the event and args as a `Msg` on the queue """ msg = Msg(event, args) self.queue.put(msg) def dispatch(self, msg): event, args = msg handler = getattr(self, "do_%s" % event, None) if not handler: raise NotImplementedError("Process has no handler for [%s]" % event) handler(*args) def run(self): while True: msg = self.queue.get() self.dispatch(msg)
Usage:
class MyProcess(BaseProcess): def do_helloworld(self, arg1, arg2): print(arg1, arg2) if __name__ == "__main__": process = MyProcess() process.start() process.send('helloworld', 'hello', 'world')
Le
send
se produit dans le processus parent, ledo_*
se produit dans le processus enfant.J'ai omis toute gestion d'exception qui interromprait évidemment la boucle d'exécution et quitterait le processus enfant. Vous pouvez également le personnaliser en remplaçant
run
pour contrôler le blocage ou quoi que ce soit d'autre.Ceci n'est vraiment utile que dans les situations où vous avez un seul processus de travail, mais je pense que c'est une réponse pertinente à cette question pour démontrer un scénario commun avec un peu plus d'orientation objet.
la source
J'ai jeté un coup d'œil à plusieurs réponses sur le débordement de pile et sur le Web tout en essayant de mettre en place un moyen de faire du multitraitement en utilisant des files d'attente pour faire circuler de grands cadres de données pandas. Il me semblait que chaque réponse était de réitérer le même type de solutions sans aucune considération de la multitude de cas extrêmes que l'on rencontrera certainement lors de la mise en place de calculs comme ceux-ci. Le problème est qu'il y a plusieurs choses en jeu en même temps. Le nombre de tâches, le nombre de travailleurs, la durée de chaque tâche et les exceptions possibles lors de l'exécution de la tâche. Tout cela rend la synchronisation délicate et la plupart des réponses ne traitent pas de la façon dont vous pouvez vous y prendre. C'est donc mon avis après avoir bidouillé pendant quelques heures, j'espère que ce sera assez générique pour que la plupart des gens le trouvent utile.
Quelques réflexions avant tout exemple de codage. Étant donné que
queue.Empty
ouqueue.qsize()
ou toute autre méthode similaire n'est pas fiable pour le contrôle de flux, tout code similairewhile True: try: task = pending_queue.get_nowait() except queue.Empty: break
est faux. Cela tuera le worker même si quelques millisecondes plus tard, une autre tâche apparaît dans la file d'attente. Le travailleur ne récupérera pas et après un certain temps TOUS les travailleurs disparaîtront car ils trouveront au hasard la file d'attente momentanément vide. Le résultat final sera que la fonction principale de multitraitement (celle avec la jointure () sur les processus) reviendra sans que toutes les tâches soient terminées. Agréable. Bonne chance pour le débogage si vous avez des milliers de tâches et que quelques-unes manquent.
L'autre problème est l'utilisation des valeurs sentinelles. De nombreuses personnes ont suggéré d'ajouter une valeur sentinelle dans la file d'attente pour marquer la fin de la file d'attente. Mais le signaler à qui exactement? S'il y a N nœuds de calcul, en supposant que N est le nombre de cœurs disponibles à donner ou à prendre, une seule valeur sentinelle marquera uniquement la fin de la file d'attente pour un seul opérateur. Tous les autres travailleurs resteront assis en attendant plus de travail quand il n'en restera plus. Les exemples typiques que j'ai vus sont
while True: task = pending_queue.get() if task == SOME_SENTINEL_VALUE: break
Un ouvrier recevra la valeur sentinelle tandis que le reste attendra indéfiniment. Aucun message que j'ai rencontré n'a mentionné que vous deviez soumettre la valeur sentinelle à la file d'attente AU MOINS autant de fois que vous avez des ouvriers pour que TOUS l'obtiennent.
L'autre problème est la gestion des exceptions lors de l'exécution de la tâche. Encore une fois, ceux-ci doivent être capturés et gérés. De plus, si vous avez une
completed_tasks
file d'attente, vous devez compter indépendamment de manière déterministe le nombre d'éléments dans la file d'attente avant de décider que le travail est terminé. Encore une fois, le fait de s'appuyer sur la taille des files d'attente est voué à l'échec et renvoie des résultats inattendus.Dans l'exemple ci-dessous, la
par_proc()
fonction recevra une liste de tâches comprenant les fonctions avec lesquelles ces tâches doivent être exécutées à côté des arguments et valeurs nommés.import multiprocessing as mp import dill as pickle import queue import time import psutil SENTINEL = None def do_work(tasks_pending, tasks_completed): # Get the current worker's name worker_name = mp.current_process().name while True: try: task = tasks_pending.get_nowait() except queue.Empty: print(worker_name + ' found an empty queue. Sleeping for a while before checking again...') time.sleep(0.01) else: try: if task == SENTINEL: print(worker_name + ' no more work left to be done. Exiting...') break print(worker_name + ' received some work... ') time_start = time.perf_counter() work_func = pickle.loads(task['func']) result = work_func(**task['task']) tasks_completed.put({work_func.__name__: result}) time_end = time.perf_counter() - time_start print(worker_name + ' done in {} seconds'.format(round(time_end, 5))) except Exception as e: print(worker_name + ' task failed. ' + str(e)) tasks_completed.put({work_func.__name__: None}) def par_proc(job_list, num_cpus=None): # Get the number of cores if not num_cpus: num_cpus = psutil.cpu_count(logical=False) print('* Parallel processing') print('* Running on {} cores'.format(num_cpus)) # Set-up the queues for sending and receiving data to/from the workers tasks_pending = mp.Queue() tasks_completed = mp.Queue() # Gather processes and results here processes = [] results = [] # Count tasks num_tasks = 0 # Add the tasks to the queue for job in job_list: for task in job['tasks']: expanded_job = {} num_tasks = num_tasks + 1 expanded_job.update({'func': pickle.dumps(job['func'])}) expanded_job.update({'task': task}) tasks_pending.put(expanded_job) # Use as many workers as there are cores (usually chokes the system so better use less) num_workers = num_cpus # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more # work left to be done. for c in range(num_workers): tasks_pending.put(SENTINEL) print('* Number of tasks: {}'.format(num_tasks)) # Set-up and start the workers for c in range(num_workers): p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed)) p.name = 'worker' + str(c) processes.append(p) p.start() # Gather the results completed_tasks_counter = 0 while completed_tasks_counter < num_tasks: results.append(tasks_completed.get()) completed_tasks_counter = completed_tasks_counter + 1 for p in processes: p.join() return results
Et voici un test pour exécuter le code ci-dessus contre
def test_parallel_processing(): def heavy_duty1(arg1, arg2, arg3): return arg1 + arg2 + arg3 def heavy_duty2(arg1, arg2, arg3): return arg1 * arg2 * arg3 task_list = [ {'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, ] results = par_proc(task_list) job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())]) job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())]) assert job1 == 15 assert job2 == 21
plus un autre avec quelques exceptions
def test_parallel_processing_exceptions(): def heavy_duty1_raises(arg1, arg2, arg3): raise ValueError('Exception raised') return arg1 + arg2 + arg3 def heavy_duty2(arg1, arg2, arg3): return arg1 * arg2 * arg3 task_list = [ {'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, {'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]}, ] results = par_proc(task_list) job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())]) job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())]) assert not job1 assert job2 == 21
J'espère que c'est utile.
la source
Nous avons implémenté deux versions de celui-ci, l'une un simple pool multi- thread qui peut exécuter de nombreux types d'appels, ce qui nous facilite la vie, et la deuxième version qui utilise des processus , qui est moins flexible en termes d'appels et nécessite un appel supplémentaire à l'aneth.
La définition de Frozen_pool sur true gèrera l'exécution jusqu'à ce que finish_pool_queue soit appelé dans l'une ou l'autre des classes.
Version du fil:
''' Created on Nov 4, 2019 @author: Kevin ''' from threading import Lock, Thread from Queue import Queue import traceback from helium.loaders.loader_retailers import print_info from time import sleep import signal import os class ThreadPool(object): def __init__(self, queue_threads, *args, **kwargs): self.frozen_pool = kwargs.get('frozen_pool', False) self.print_queue = kwargs.get('print_queue', True) self.pool_results = [] self.lock = Lock() self.queue_threads = queue_threads self.queue = Queue() self.threads = [] for i in range(self.queue_threads): t = Thread(target=self.make_pool_call) t.daemon = True t.start() self.threads.append(t) def make_pool_call(self): while True: if self.frozen_pool: #print '--> Queue is frozen' sleep(1) continue item = self.queue.get() if item is None: break call = item.get('call', None) args = item.get('args', []) kwargs = item.get('kwargs', {}) keep_results = item.get('keep_results', False) try: result = call(*args, **kwargs) if keep_results: self.lock.acquire() self.pool_results.append((item, result)) self.lock.release() except Exception as e: self.lock.acquire() print e traceback.print_exc() self.lock.release() os.kill(os.getpid(), signal.SIGUSR1) self.queue.task_done() def finish_pool_queue(self): self.frozen_pool = False while self.queue.unfinished_tasks > 0: if self.print_queue: print_info('--> Thread pool... %s' % self.queue.unfinished_tasks) sleep(5) self.queue.join() for i in range(self.queue_threads): self.queue.put(None) for t in self.threads: t.join() del self.threads[:] def get_pool_results(self): return self.pool_results def clear_pool_results(self): del self.pool_results[:]
Version du processus:
''' Created on Nov 4, 2019 @author: Kevin ''' import traceback from helium.loaders.loader_retailers import print_info from time import sleep import signal import os from multiprocessing import Queue, Process, Value, Array, JoinableQueue, Lock,\ RawArray, Manager from dill import dill import ctypes from helium.misc.utils import ignore_exception from mem_top import mem_top import gc class ProcessPool(object): def __init__(self, queue_processes, *args, **kwargs): self.frozen_pool = Value(ctypes.c_bool, kwargs.get('frozen_pool', False)) self.print_queue = kwargs.get('print_queue', True) self.manager = Manager() self.pool_results = self.manager.list() self.queue_processes = queue_processes self.queue = JoinableQueue() self.processes = [] for i in range(self.queue_processes): p = Process(target=self.make_pool_call) p.start() self.processes.append(p) print 'Processes', self.queue_processes def make_pool_call(self): while True: if self.frozen_pool.value: sleep(1) continue item_pickled = self.queue.get() if item_pickled is None: #print '--> Ending' self.queue.task_done() break item = dill.loads(item_pickled) call = item.get('call', None) args = item.get('args', []) kwargs = item.get('kwargs', {}) keep_results = item.get('keep_results', False) try: result = call(*args, **kwargs) if keep_results: self.pool_results.append(dill.dumps((item, result))) else: del call, args, kwargs, keep_results, item, result except Exception as e: print e traceback.print_exc() os.kill(os.getpid(), signal.SIGUSR1) self.queue.task_done() def finish_pool_queue(self, callable=None): self.frozen_pool.value = False while self.queue._unfinished_tasks.get_value() > 0: if self.print_queue: print_info('--> Process pool... %s' % (self.queue._unfinished_tasks.get_value())) if callable: callable() sleep(5) for i in range(self.queue_processes): self.queue.put(None) self.queue.join() self.queue.close() for p in self.processes: with ignore_exception: p.join(10) with ignore_exception: p.terminate() with ignore_exception: del self.processes[:] def get_pool_results(self): return self.pool_results def clear_pool_results(self): del self.pool_results[:]
Appelez soit avec:
tp = ThreadPool(queue_threads=2) tp.queue.put({'call': test, 'args': [random.randint(0, 100)]}) tp.finish_pool_queue()
ou
pp = ProcessPool(queue_processes=2) pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]})) pp.queue.put(dill.dumps({'call': test, 'args': [random.randint(0, 100)]})) pp.finish_pool_queue()
la source
Je viens de faire un exemple simple et général pour démontrer le passage d'un message sur une file d'attente entre 2 programmes autonomes. Il ne répond pas directement à la question du PO mais devrait être suffisamment clair pour indiquer le concept.
Serveur:
multiprocessing-queue-manager-server.py
import asyncio import concurrent.futures import multiprocessing import multiprocessing.managers import queue import sys import threading from typing import Any, AnyStr, Dict, Union class QueueManager(multiprocessing.managers.BaseManager): def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: pass def get_queue(ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: global q if not ident in q: q[ident] = multiprocessing.Queue() return q[ident] q: Dict[Union[AnyStr, int, type(None)], multiprocessing.Queue] = dict() delattr(QueueManager, 'get_queue') def init_queue_manager_server(): if not hasattr(QueueManager, 'get_queue'): QueueManager.register('get_queue', get_queue) def serve(no: int, term_ev: threading.Event): manager: QueueManager with QueueManager(authkey=QueueManager.__name__.encode()) as manager: print(f"Server address {no}: {manager.address}") while not term_ev.is_set(): try: item: Any = manager.get_queue().get(timeout=0.1) print(f"Client {no}: {item} from {manager.address}") except queue.Empty: continue async def main(n: int): init_queue_manager_server() term_ev: threading.Event = threading.Event() executor: concurrent.futures.ThreadPoolExecutor = concurrent.futures.ThreadPoolExecutor() i: int for i in range(n): asyncio.ensure_future(asyncio.get_running_loop().run_in_executor(executor, serve, i, term_ev)) # Gracefully shut down try: await asyncio.get_running_loop().create_future() except asyncio.CancelledError: term_ev.set() executor.shutdown() raise if __name__ == '__main__': asyncio.run(main(int(sys.argv[1])))
Client:
multiprocessing-queue-manager-client.py
import multiprocessing import multiprocessing.managers import os import sys from typing import AnyStr, Union class QueueManager(multiprocessing.managers.BaseManager): def get_queue(self, ident: Union[AnyStr, int, type(None)] = None) -> multiprocessing.Queue: pass delattr(QueueManager, 'get_queue') def init_queue_manager_client(): if not hasattr(QueueManager, 'get_queue'): QueueManager.register('get_queue') def main(): init_queue_manager_client() manager: QueueManager = QueueManager(sys.argv[1], authkey=QueueManager.__name__.encode()) manager.connect() message = f"A message from {os.getpid()}" print(f"Message to send: {message}") manager.get_queue().put(message) if __name__ == '__main__': main()
Usage
Serveur:
N
est un entier indiquant le nombre de serveurs à créer. Copiez l'un des<server-address-N>
résultats du serveur et faites-en le premier argument de chacunmultiprocessing-queue-manager-client.py
.Client:
python3 multiprocessing-queue-manager-client.py <server-address-1>
Résultat
Serveur:
Client 1: <item> from <server-address-1>
Gist: https://gist.github.com/89062d639e40110c61c2f88018a8b0e5
UPD : a créé un package ici .
Serveur:
import ipcq with ipcq.QueueManagerServer(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) as server: server.get_queue().get()
Client:
import ipcq client = ipcq.QueueManagerClient(address=ipcq.Address.DEFAULT, authkey=ipcq.AuthKey.DEFAULT) client.get_queue().put('a message')
la source