Délai d'expiration d'un appel de fonction

300

J'appelle une fonction en Python qui je sais peut bloquer et me forcer à redémarrer le script.

Comment appeler la fonction ou comment l'envelopper pour que si cela prend plus de 5 secondes, le script l'annule et fasse autre chose?

Teifion
la source

Réponses:

227

Vous pouvez utiliser le package de signaux si vous exécutez sous UNIX:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

10 secondes après l'appel alarm.alarm(10), le gestionnaire est appelé. Cela déclenche une exception que vous pouvez intercepter à partir du code Python standard.

Ce module ne fonctionne pas bien avec les threads (mais alors, qui le fait?)

Notez que puisque nous levons une exception lorsque le délai d'expiration se produit, elle peut finir par être interceptée et ignorée à l'intérieur de la fonction, par exemple d'une de ces fonctions:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue
piro
la source
5
J'utilise Python 2.5.4. Il y a une telle erreur: Traceback (dernier appel le plus récent): Fichier "aa.py", ligne 85, dans func signal.signal (signal.SIGALRM, gestionnaire) AttributeError: l'objet 'module' n'a pas d'attribut 'SIGALRM'
flypen
11
@flypen c'est parce que signal.alarmet les éléments associés SIGALRMne sont pas disponibles sur les plates-formes Windows.
Double AA
2
S'il y a beaucoup de processus et que chaque appel signal.signal--- fonctionnera-t-il tous correctement? Est-ce que chaque signal.signalappel n'en annulera pas un "simultané"?
brownien
1
Avertissement pour ceux qui souhaitent utiliser ceci avec une extension C: Le gestionnaire de signal Python ne sera pas appelé jusqu'à ce que la fonction C retourne le contrôle à l'interpréteur Python. Pour ce cas d'utilisation, utilisez la réponse d'ATOzTOA: stackoverflow.com/a/14924210/1286628
wkschwartz
13
J'appuie l'avertissement sur les threads. signal.alarm ne fonctionne que sur le thread principal. J'ai essayé de l'utiliser dans les vues Django - échec immédiat avec verbiage sur le thread principal uniquement.
JL Peyret
154

Vous pouvez utiliser multiprocessing.Processpour faire exactement cela.

Code

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()
ATOzTOA
la source
36
Comment puis-je obtenir la valeur de retour de la méthode cible?
bad_keypoints
4
Cela ne semble pas fonctionner si la fonction appelée est bloquée sur un bloc d'E / S.
sudo
4
@bad_keypoints Voir cette réponse: stackoverflow.com/a/10415215/1384471 Fondamentalement, vous passez une liste dans laquelle vous mettez la réponse.
Peter
1
@sudo puis supprimez le join(). qui rend votre x nombre de sous - processus simultanés étant en cours d' exécution jusqu'à ce les terminer leur travail, ou un montant défini dans join(10). Si vous avez une E / S bloquante pour 10 processus, en utilisant join (10), vous les avez configurés pour les attendre tous au maximum 10 pour CHAQUE processus qui a démarré. Utilisez l'indicateur démon comme cet exemple stackoverflow.com/a/27420072/2480481 . Bien sûr, vous pouvez passer le drapeau daemon=Truedirectement pour multiprocessing.Process()fonctionner.
m3nda
2
@ATOzTOA le problème avec cette solution, au moins pour mes besoins, est qu'elle ne permet potentiellement pas aux enfants de nettoyer les marches après eux-mêmes. Extrait de la documentation de la fonction d'arrêtterminate() ... Note that exit handlers and finally clauses, etc., will not be executed. Note that descendant processes of the process will not be terminated – they will simply become orphaned.
abalcerek
78

Comment appeler la fonction ou comment l'envelopper pour que si elle prend plus de 5 secondes, le script l'annule?

J'ai posté un résumé qui résout cette question / problème avec un décorateur et un threading.Timer. Le voici avec une panne.

Importations et configurations pour la compatibilité

Il a été testé avec Python 2 et 3. Il devrait également fonctionner sous Unix / Linux et Windows.

D'abord les importations. Ceux-ci tentent de garder le code cohérent quelle que soit la version de Python:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

Utilisez un code indépendant de la version:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Maintenant, nous avons importé nos fonctionnalités de la bibliothèque standard.

exit_after décorateur

Ensuite, nous avons besoin d'une fonction pour terminer le à main()partir du thread enfant:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

Et voici le décorateur lui-même:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

Usage

Et voici l'utilisation qui répond directement à votre question sur la sortie après 5 secondes!:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

Démo:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

Le deuxième appel de fonction ne se terminera pas, à la place, le processus devrait se terminer avec une trace!

KeyboardInterrupt n'arrête pas toujours un fil endormi

Notez que le sommeil ne sera pas toujours interrompu par une interruption clavier, sur Python 2 sous Windows, par exemple:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

il n'est pas non plus susceptible d'interrompre le code en cours d'exécution dans les extensions à moins qu'il ne le vérifie explicitement PyErr_CheckSignals(), voir Cython, Python et KeyboardInterrupt ignorés

J'éviterais de dormir un thread plus d'une seconde, en tout cas - c'est une éon en temps de processeur.

Comment appeler la fonction ou comment l'envelopper pour que si cela prend plus de 5 secondes, le script l'annule et fasse autre chose?

Pour l'attraper et faire autre chose, vous pouvez attraper le KeyboardInterrupt.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
Aaron Hall
la source
Je n'ai pas encore lu l'intégralité de votre article, mais je me suis simplement demandé: et si flush valait 0? Cela serait interprété comme faux dans la déclaration if ci-dessous, non?
Koenraad van Duin
2
Pourquoi dois-je appeler thread.interrupt_main(), pourquoi ne puis-je pas lever directement une exception?
Anirban Nag 'tintinmj'
Avez-vous des idées pour vous envelopper multiprocessing.connection.Client? - Essayer de résoudre: stackoverflow.com/questions/57817955/…
wwii
51

J'ai une proposition différente qui est une fonction pure (avec la même API que la suggestion de filetage) et semble fonctionner correctement (basée sur les suggestions de ce fil)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result
Alex
la source
3
Vous devez également restaurer le gestionnaire de signal d'origine. Voir stackoverflow.com/questions/492519/…
Martin Konecny
9
Une dernière remarque: la méthode du signal Unix ne fonctionne que si vous l'appliquez dans le thread principal. Son application dans un sous-thread lève une exception et ne fonctionnera pas.
Martin Konecny
12
Ce n'est pas la meilleure solution car elle ne fonctionne que sur Linux.
max
17
Max, pas vrai - fonctionne sur tout unix compatible POSIX. Je pense que votre commentaire devrait être plus précis, ne fonctionne pas sur Windows.
Chris Johnson
6
Vous devez éviter de définir kwargs sur un dict vide. Un piège Python commun est que les arguments par défaut sur les fonctions sont mutables. Ce dictionnaire sera donc partagé entre tous les appels à timeout. Il est préférable de définir la valeur par défaut sur Noneet, sur la première ligne de la fonction, d'ajouter kwargs = kwargs or {}. Args est correct car les tuples ne sont pas modifiables.
scottmrogowski
32

J'ai parcouru ce fil lors de la recherche d'un appel de délai d'attente sur les tests unitaires. Je n'ai rien trouvé de simple dans les réponses ou les packages tiers, j'ai donc écrit le décorateur ci-dessous, vous pouvez le déposer directement dans le code:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

Ensuite, c'est aussi simple que cela pour expirer un test ou une fonction que vous aimez:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...
Riches
la source
14
Soyez prudent car cela ne met pas fin à la fonction une fois le délai d'expiration atteint!
Sylvain
Notez que sous Windows, cela engendre un processus entièrement nouveau - qui prendra du temps pour expirer, peut-être beaucoup si les dépendances prennent beaucoup de temps à configurer.
Aaron Hall
1
Oui, cela nécessite quelques ajustements. Il laisse des fils pour toujours.
sudo
2
IDK si c'est la meilleure façon, mais vous pouvez essayer / catch à l' Exceptionintérieur de func_wrapper et faire pool.close()après la capture pour vous assurer que le thread meurt toujours après quoi qu'il arrive . Ensuite, vous pouvez lancer TimeoutErrorou ce que vous voulez après. Semble fonctionner pour moi.
sudo
2
C'est utile, mais une fois que je l'ai fait plusieurs fois, je comprends RuntimeError: can't start new thread. Cela fonctionnera-t-il toujours si je l'ignore ou est-ce que je peux faire autre chose pour contourner cela? Merci d'avance!
Benjie
20

Le stopitpackage, trouvé sur pypi, semble bien gérer les délais d'attente.

J'aime le @stopit.threading_timeoutabledécorateur, qui ajoute un timeoutparamètre à la fonction décorée, qui fait ce que vous attendez, il arrête la fonction.

Découvrez-le sur pypi: https://pypi.python.org/pypi/stopit

egeland
la source
1
Il est très pratique et sans fil! Merci et en plus! C'est la meilleure option que j'ai trouvée jusqu'à présent et encore meilleure que la réponse acceptée !!
Yahya
Revendications de bibliothèque, certaines fonctionnalités ne fonctionnent pas sous Windows.
Stefan Simik
16

Il y a beaucoup de suggestions, mais aucune n'utilise simult.futures, ce qui, à mon avis, est la façon la plus lisible de gérer cela.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

Super simple à lire et à entretenir.

Nous créons un pool, soumettons un seul processus, puis attendons jusqu'à 5 secondes avant de déclencher une TimeoutError que vous pouvez intercepter et gérer comme bon vous semble.

Natif de python 3.2+ et rétroporté à 2.7 (pip install futures).

Basculer entre les threads et les processus est aussi simple que de le remplacer ProcessPoolExecutorparThreadPoolExecutor .

Si vous souhaitez mettre fin au processus à l'expiration du délai, je vous suggère d'examiner Pebble .

Brian
la source
2
Que signifie «Avertissement: cela ne met pas fin à la fonction en cas de dépassement de délai»?
Scott Stafford
5
@ScottStafford Les processus / threads ne s'arrêtent pas simplement parce qu'une TimeoutError a été déclenchée. Ainsi, le processus ou le thread tentera toujours de s'exécuter jusqu'à la fin et ne vous redonnera pas automatiquement le contrôle à votre expiration.
Brian
Est-ce que cela me permettrait d'enregistrer des résultats intermédiaires à ce moment-là? Par exemple, si j'ai une fonction récursive que j'ai définie le délai d'expiration à 5, et pendant ce temps, j'ai des résultats partiels, comment puis-je écrire la fonction pour retourner les résultats partiels sur le délai d'expiration?
SumNeuron
J'utilise exactement cela, mais j'ai 1000 tâches, chacune est autorisée 5 secondes avant le délai d'expiration. Mon problème est que les cœurs sont obstrués sur des tâches qui ne se terminent jamais parce que le délai d'attente n'est appliqué que sur le total des tâches et non sur des tâches individuelles. concurrent.futures ne fournit pas de solution à cet afaik.
Bastiaan
12

Générateur de délai d'expiration du projet PyPi , facile à utiliser et fiable ( https://pypi.org/project/timeout-decorator/ )

installation :

pip install timeout-decorator

Utilisation :

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
Gil
la source
2
J'apprécie la solution claire. Mais quelqu'un pourrait-il expliquer comment cette bibliothèque fonctionne, en particulier lorsqu'il s'agit de multithreading. Personnellement, je crains d'utiliser un machanisme inconnu pour gérer les fils ou les signaux.
wsysuper
@wsysuper the lib a 2 modes d'opérations: ouvrir un nouveau thread ou un nouveau sous-processus (qui suppose être thread-safe)
Gil
cela a très bien fonctionné pour moi!
Florian Heigl Il y a
6

Je suis l'auteur de wrapt_timeout_decorator

La plupart des solutions présentées ici fonctionnent à merveille sous Linux à première vue - parce que nous avons fork () et signaux () - mais sur Windows, les choses semblent un peu différentes. Et quand il s'agit de sous-threads sur Linux, vous ne pouvez plus utiliser de signaux.

Pour générer un processus sous Windows, il doit être sélectionnable - et de nombreuses fonctions décorées ou méthodes de classe ne le sont pas.

Vous devez donc utiliser un meilleur pickler comme l'aneth et le multiprocess (pas le pickle et le multiprocessing) - c'est pourquoi vous ne pouvez pas utiliser ProcessPoolExecutor (ou seulement avec des fonctionnalités limitées).

Pour le délai lui-même - Vous devez définir ce que signifie le délai d'attente - car sous Windows, il faudra un temps considérable (et non déterminable) pour lancer le processus. Cela peut être délicat sur de courts délais. Supposons que la génération du processus prenne environ 0,5 seconde (facilement !!!). Si vous donnez un délai de 0,2 seconde, que devrait-il se passer? La fonction doit-elle expirer après 0,5 + 0,2 seconde (alors laissez la méthode s'exécuter pendant 0,2 seconde)? Ou le processus appelé devrait-il expirer après 0,2 seconde (dans ce cas, la fonction décorée expire TOUJOURS, car pendant ce temps, elle n'est même pas générée)?

Les décorateurs imbriqués peuvent également être méchants et vous ne pouvez pas utiliser de signaux dans un sous-fil. Si vous voulez créer un décorateur multiplateforme véritablement universel, tout cela doit être pris en considération (et testé).

D'autres problèmes transmettent des exceptions à l'appelant, ainsi que des problèmes de journalisation (s'ils sont utilisés dans la fonction décorée - la journalisation dans des fichiers dans un autre processus n'est PAS prise en charge)

J'ai essayé de couvrir tous les cas marginaux, vous pouvez examiner le package wrapt_timeout_decorator, ou au moins tester vos propres solutions en vous inspirant des tests non utilisés.

@Alexis Eggermont - malheureusement, je n'ai pas assez de points à commenter - peut-être que quelqu'un d'autre peut vous en informer - je pense avoir résolu votre problème d'importation.

bitranox
la source
3

timeout-decorator ne fonctionne pas sur le système Windows car Windows ne prend pas en charge signal bien.

Si vous utilisez timeout-decorator dans le système Windows, vous obtiendrez les éléments suivants

AttributeError: module 'signal' has no attribute 'SIGALRM'

Certains ont suggéré d'utiliser use_signals=False mais n'ont pas fonctionné pour moi.

L'auteur @bitranox a créé le package suivant:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

Échantillon de code:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

Donne l'exception suivante:

TimeoutError: Function mytest timed out after 5 seconds
comme si
la source
Cela ressemble à une très bonne solution. Étrangement, la ligne from wrapt_timeout_decorator import * semble tuer certaines de mes autres importations. Par exemple ModuleNotFoundError: No module named 'google.appengine', j'obtiens , mais je n'obtiens pas cette erreur si je n'importe pas wrapt_timeout_decorator
Alexis Eggermont
@AlexisEggermont J'étais sur le point de l'utiliser avec Appengine ... donc je suis très curieux de savoir si cette erreur a persisté?
PascalVKooten
2

Nous pouvons utiliser des signaux pour la même chose. Je pense que l'exemple ci-dessous vous sera utile. C'est très simple par rapport aux threads.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"
AR
la source
1
Il serait préférable de choisir une exception spécifique et de ne l'attraper que. Nus try: ... except: ...sont toujours une mauvaise idée.
hivert
Je suis d'accord avec toi hivert.
AR
bien que je comprenne la raison, en tant qu'administrateur système / intégrateur, je suis en désaccord - le code python est connu pour négliger la gestion des erreurs, et gérer la seule chose à laquelle vous vous attendez n'est pas assez bon pour un logiciel de qualité. vous pouvez gérer les 5 choses que vous prévoyez ET une stratégie générique pour d'autres choses. "Traceback, None" n'est pas une stratégie, c'est une insulte.
Florian Heigl Il y a
2
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
Hal Canary
la source
7
Bien que ce code puisse répondre à la question, fournir un contexte supplémentaire sur la façon et / ou la raison pour laquelle il résout le problème améliorerait la valeur à long terme de la réponse
Dan Cornilescu
1

J'avais besoin de interruptions chronométrées emboîtables (que SIGALARM ne peut pas faire) qui ne seront pas bloquées par time.sleep (ce que l'approche basée sur les threads ne peut pas faire). J'ai fini par copier et modifier légèrement le code d'ici: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

Le code lui-même:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

et un exemple d'utilisation:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'
James
la source
Cela utilise également le signal et ne fonctionnera donc pas s'il est appelé à partir d'un thread.
garg10mai
0

Voici une légère amélioration de la solution basée sur les threads donnée.

Le code ci-dessous prend en charge les exceptions :

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

L'invoquer avec un délai de 5 secondes:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
diemacht
la source
1
Cela déclenchera une nouvelle exception masquant le retraçage d'origine. Voir ma version ci-dessous ...
Meitham
1
Ceci est également dangereux, comme si dans runFunctionCatchExceptions()certaines fonctions Python l'obtention de GIL était appelée. Par exemple , le suivant ne serait jamais, ou très longtemps, le retour si elle est appelée dans la fonction: eval(2**9999999999**9999999999). Voir stackoverflow.com/questions/22138190/…
Mikko Ohtamaa