Interruptions de clavier avec le pool multitraitement de Python

136

Comment puis-je gérer les événements KeyboardInterrupt avec les pools de multitraitement de Python? Voici un exemple simple:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

Lors de l'exécution du code ci-dessus, le KeyboardInterruptest déclenché lorsque j'appuie sur ^C, mais le processus se bloque simplement à ce stade et je dois le tuer en externe.

Je veux pouvoir appuyer ^Cà tout moment et faire en sorte que tous les processus se terminent gracieusement.

Fragsworth
la source
J'ai résolu mon problème en utilisant psutil, vous pouvez voir la solution ici: stackoverflow.com/questions/32160054/…
Tiago Albineli Motta

Réponses:

137

Ceci est un bogue Python. Lors de l'attente d'une condition dans threading.Condition.wait (), KeyboardInterrupt n'est jamais envoyé. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

L'exception KeyboardInterrupt ne sera pas délivrée tant que wait () ne sera pas retourné, et elle ne revient jamais, donc l'interruption ne se produit jamais. KeyboardInterrupt devrait presque certainement interrompre une condition d'attente.

Notez que cela ne se produit pas si un délai d'expiration est spécifié; cond.wait (1) recevra l'interruption immédiatement. Ainsi, une solution de contournement consiste à spécifier un délai d'expiration. Pour ce faire, remplacez

    results = pool.map(slowly_square, range(40))

avec

    results = pool.map_async(slowly_square, range(40)).get(9999999)

ou similaire.

Glenn Maynard
la source
3
Ce bogue est-il dans le tracker officiel de Python n'importe où? J'ai du mal à le trouver, mais je n'utilise probablement pas les meilleurs termes de recherche.
Joseph Garvin
18
Ce bogue a été classé comme [Problème 8296] [1]. [1]: bugs.python.org/issue8296
Andrey Vlasovskikh
1
Voici un hack qui corrige pool.imap () de la même manière, rendant Ctrl-C possible lors de l'itération sur imap. Attrapez l'exception et appelez pool.terminate () et votre programme se fermera. gist.github.com/626518
Alexander Ljungberg
6
Cela ne règle pas tout à fait les choses. Parfois, j'obtiens le comportement attendu lorsque j'appuie sur Ctrl + C, d'autres fois non. Je ne sais pas pourquoi, mais il semble que The KeyboardInterrupt soit reçu par l'un des processus au hasard, et je n'obtiens le comportement correct que si le processus parent est celui qui l'attrape.
Ryan C. Thompson
6
Cela ne fonctionne pas pour moi avec Python 3.6.1 sous Windows. J'obtiens des tonnes de traces de pile et d'autres déchets lorsque je fais Ctrl-C, c'est-à-dire comme sans une telle solution de contournement. En fait, aucune des solutions que j'ai essayées à partir de ce fil ne semble fonctionner ...
szx
56

D'après ce que j'ai trouvé récemment, la meilleure solution est de configurer les processus de travail pour ignorer complètement SIGINT et de confiner tout le code de nettoyage au processus parent. Cela résout le problème pour les processus de travail inactifs et occupés et ne nécessite aucun code de gestion des erreurs dans vos processus enfants.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Des explications et un exemple de code complet sont disponibles respectivement à l' adresse http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ et http://github.com/jreese/multiprocessing-keyboardinterrupt .

John Reese
la source
4
Salut John. Votre solution ne fait pas la même chose que ma solution, malheureusement compliquée. Il se cache derrière le time.sleep(10)processus principal. Si vous supprimez ce sommeil, ou si vous attendez que le processus tente de se joindre au pool, ce que vous devez faire pour garantir que les travaux sont terminés, vous souffrez toujours du même problème qui est le processus principal ne ne recevez pas le KeyboardInterrupt pendant qu'il attend une joinopération d' interrogation .
bboe
Dans le cas où j'utilisais ce code en production, time.sleep () faisait partie d'une boucle qui vérifierait l'état de chaque processus enfant, puis redémarrerait certains processus avec un délai si nécessaire. Plutôt que de rejoindre () qui attendrait la fin de tous les processus, il les vérifierait individuellement, garantissant que le processus maître restait réactif.
John Reese
2
Donc, c'était plus une attente chargée (peut-être avec de petites périodes de sommeil entre les vérifications) qui interrogeait l'achèvement du processus via une autre méthode plutôt que de rejoindre? Si tel est le cas, il serait peut-être préférable d'inclure ce code dans votre article de blog, car vous pouvez alors garantir que tous les travailleurs ont terminé avant d'essayer de rejoindre.
bboe
4
Cela ne marche pas. Seuls les enfants reçoivent le signal. Le parent ne le reçoit jamais, donc pool.terminate()n'est jamais exécuté. Faire ignorer le signal aux enfants ne fait rien. La réponse de @ Glenn résout le problème.
Cerin
1
Ma version de ceci est à gist.github.com/admackin/003dd646e5fadee8b8d6 ; il n'appelle .join()que sur interruption - il vérifie simplement manuellement le résultat de l' .apply_async()utilisation AsyncResult.ready()pour voir s'il est prêt, ce qui signifie que nous avons terminé proprement.
Andy MacKinlay
29

Pour certaines raisons, seules les exceptions héritées de la Exceptionclasse de base sont gérées normalement. Pour contourner ce problème, vous pouvez re-déclencher votre KeyboardInterrupten tant Exceptionqu'instance:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normalement, vous obtiendrez la sortie suivante:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

Donc, si vous frappez ^C, vous obtiendrez:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
Andrey Vlasovskikh
la source
2
Il semble que ce ne soit pas une solution complète. Si un KeyboardInterruptest arrivé alors qu'il multiprocessingeffectue son propre échange de données IPC, le try..catchne sera pas activé (évidemment).
Andrey Vlasovskikh
Vous pouvez le remplacer raise KeyboardInterruptErrorpar un fichier return. Vous devez simplement vous assurer que le processus enfant se termine dès que KeyboardInterrupt est reçu. La valeur de retour semble être ignorée, maintoujours le KeyboardInterrupt est reçu.
Bernhard
8

Habituellement, cette structure simple fonctionne pour Ctrl- Csur Pool:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

Comme indiqué dans quelques articles similaires:

Capturez l'interruption du clavier en Python sans essayer, sauf

igco
la source
1
Cela devrait également être effectué sur chacun des processus de travail et peut toujours échouer si KeyboardInterrupt est déclenché pendant l'initialisation de la bibliothèque multitraitement.
MarioVilas
7

La réponse votée n'aborde pas le problème central mais un effet secondaire similaire.

Jesse Noller, l'auteur de la bibliothèque multitraitement, explique comment gérer correctement CTRL + C lors de l'utilisation multiprocessing.Pooldans un ancien article de blog .

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()
noxdafox
la source
J'ai constaté que ProcessPoolExecutor avait également le même problème. Le seul correctif que j'ai pu trouver était d'appeler os.setpgrp()de l'intérieur du futur
portforwardpodcast
1
Bien sûr, la seule différence est que ProcessPoolExecutorne prend pas en charge les fonctions d'initialisation. Sous Unix, vous pouvez tirer parti de la forkstratégie en désactivant le sighandler sur le processus principal avant de créer le pool et en le réactivant par la suite. Dans Pebble , je mets en silence SIGINTles processus enfants par défaut. Je ne connais pas la raison pour laquelle ils ne font pas la même chose avec les pools Python. À la fin, l'utilisateur peut réinitialiser le SIGINTgestionnaire au cas où il / elle voudrait se blesser.
noxdafox
Cette solution semble également empêcher Ctrl-C d'interrompre le processus principal.
Paul Price
1
Je viens de tester sur Python 3.5 et cela fonctionne, quelle version de Python utilisez-vous? Quel OS?
noxdafox
5

Il semble qu'il y ait deux problèmes qui font des exceptions tout en multitraitement ennuyeux. Le premier (noté par Glenn) est que vous devez utiliser map_asyncavec un timeout au lieu de mappour obtenir une réponse immédiate (c'est-à-dire ne pas terminer le traitement de la liste entière). Le second (noté par Andrey) est que le multitraitement n'attrape pas les exceptions qui n'héritent pas de Exception(par exemple, SystemExit). Voici donc ma solution qui traite de ces deux éléments:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results
Prix ​​Paul
la source
1
Je n'ai remarqué aucune pénalité de performance, mais dans mon cas, la functiondurée de vie est assez longue (des centaines de secondes).
Paul Price
Ce n'est plus le cas, du moins d'après mes yeux et mon expérience. Si vous détectez l'exception de clavier dans les processus enfants individuels et que vous la rattrapez une fois de plus dans le processus principal, vous pouvez continuer à utiliser mapet tout va bien. @Linux Cli Aikfourni une solution ci-dessous qui produit ce comportement. L'utilisation map_asyncn'est pas toujours souhaitée si le thread principal dépend des résultats des processus enfants.
Code Doggo
4

J'ai trouvé, pour le moment, que la meilleure solution était de ne pas utiliser la fonctionnalité multiprocessing.pool mais plutôt de lancer votre propre fonctionnalité de pool. J'ai fourni un exemple montrant l'erreur avec apply_async ainsi qu'un exemple montrant comment éviter d'utiliser complètement la fonctionnalité de pool.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

bboe
la source
Fonctionne comme un charme. C'est une solution propre et non une sorte de hack (/ me pense) .btw, l'astuce avec .get (99999) comme proposé par d'autres nuit gravement aux performances.
Walter
Je n'ai remarqué aucune pénalité de performance due à l'utilisation d'un timeout, bien que j'aie utilisé 9999 au lieu de 999999. L'exception est quand une exception qui n'hérite pas de la classe Exception est déclenchée: alors vous devez attendre que le timeout soit frappé. La solution à cela est d'attraper toutes les exceptions (voir ma solution).
Paul Price
1

Je suis un novice en Python. Je cherchais partout une réponse et je suis tombé sur ceci et sur quelques autres blogs et vidéos YouTube. J'ai essayé de copier coller le code de l'auteur ci-dessus et de le reproduire sur mon python 2.7.13 sous Windows 7 64 bits. C'est proche de ce que je veux réaliser.

J'ai fait en sorte que mes processus enfants ignorent le ControlC et que le processus parent se termine. Il semble que le contournement du processus enfant évite ce problème pour moi.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

La partie commençant à pool.terminate()semble ne jamais s'exécuter.

Linux Cli Aik
la source
Je viens de comprendre cela aussi! Je pense honnêtement que c'est la meilleure solution pour un problème comme celui-ci. La solution acceptée s'impose map_asyncà l'utilisateur, ce que je n'aime pas particulièrement. Dans de nombreuses situations, comme la mienne, le thread principal doit attendre la fin des processus individuels. C'est l'une des raisons pour lesquelles mapexiste!
Code Doggo
1

Vous pouvez essayer d'utiliser la méthode apply_async d'un objet Pool, comme ceci:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Production:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

Un avantage de cette méthode est que les résultats traités avant l'interruption seront renvoyés dans le dictionnaire des résultats:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
bparker856
la source
Exemple glorieux et complet
eMTy
-5

Curieusement, il semble que vous deviez également gérer KeyboardInterruptles enfants. Je me serais attendu à ce que cela fonctionne comme écrit ... essayez de changer slowly_squarepour:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

Cela devrait fonctionner comme prévu.

D.Shawley
la source
1
J'ai essayé cela, et cela ne met pas fin à l'ensemble des tâches. Il met fin aux travaux en cours d'exécution, mais le script affecte toujours les travaux restants dans l'appel pool.map comme si tout était normal.
Fragsworth
c'est OK, mais vous risquez de perdre la trace des erreurs qui se produisent. renvoyer l'erreur avec un stacktrace peut fonctionner afin que le processus parent puisse dire qu'une erreur s'est produite, mais il ne se termine toujours pas immédiatement lorsque l'erreur se produit.
mehtunguh