Quelle est la meilleure façon d'exécuter une fonction de manière répétée toutes les x secondes?

284

Je veux exécuter à plusieurs reprises une fonction en Python toutes les 60 secondes pour toujours (tout comme un NSTimer dans Objective C). Ce code s'exécutera en tant que démon et revient à appeler le script python toutes les minutes à l'aide d'un cron, mais sans exiger qu'il soit configuré par l'utilisateur.

Dans cette question sur un cron implémenté en Python , la solution semble effectivement juste sleep () pendant x secondes. Je n'ai pas besoin de fonctionnalités aussi avancées, alors peut-être que quelque chose comme ça fonctionnerait

while True:
    # Code executed here
    time.sleep(60)

Y a-t-il des problèmes prévisibles avec ce code?

davidmytton
la source
84
Un point pédant, mais peut être critique, votre code ci-dessus ne s'exécute pas toutes les 60 secondes, il met un écart de 60 secondes entre les exécutions. Cela ne se produit que toutes les 60 secondes si votre code exécuté ne prend aucun temps.
Simon
4
time.sleep(60)peut également revenir plus tôt et plus tard
jfs
5
Je me demande toujours: y a-t-il des problèmes prévisibles avec ce code?
Banana
1
Le "problème prévisible" est que vous ne pouvez pas vous attendre à 60 itérations par heure en utilisant simplement time.sleep (60). Donc, si vous ajoutez un élément par itération et que vous conservez une liste de longueur définie ... la moyenne de cette liste ne représentera pas une "période" de temps cohérente; ainsi des fonctions telles que "moyenne mobile" peuvent référencer des points de données trop anciens, ce qui faussera votre indication.
litepresence
2
@Banana Oui, vous pouvez vous attendre à des problèmes, car votre script n'est pas exécuté EXACTEMENT toutes les 60 secondes. Par exemple. J'ai commencé à faire quelque chose comme ça pour diviser les flux vidéo et les télécharger, et j'ai fini par obtenir des strems 5 à 10 secondes de plus car la file d'attente multimédia est en mémoire tampon pendant que je traite les données à l'intérieur de la boucle. Cela dépend de vos données. Si la fonction est une sorte de chien de garde simple qui vous avertit, par exemple, lorsque votre disque est plein, vous ne devriez avoir aucun problème avec cela.Si vous vérifiez les alertes d'avertissement d'une centrale nucléaire, vous pouvez vous retrouver avec une ville complètement explosé x
DGoiko

Réponses:

229

Si votre programme n'a pas déjà de boucle d'événement, utilisez le module sched , qui implémente un planificateur d'événements à usage général.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print("Doing stuff...")
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()

Si vous utilisez déjà une bibliothèque de boucle d'événements comme asyncio, trio, tkinter, PyQt5, gobject, kivy, et bien d' autres - juste planifier la tâche en utilisant votre méthodes de bibliothèque de boucles d'événements existant, au lieu.

nosklo
la source
16
Le module sched permet de planifier des fonctions à exécuter après un certain temps, comment l'utilisez-vous pour répéter un appel de fonction toutes les x secondes sans utiliser time.sleep ()?
Baishampayan Ghose
2
@Baishampayan: Planifiez simplement une nouvelle course.
nosklo
3
Ensuite, apscheduler sur packages.python.org/APScheduler devrait également obtenir une mention à ce stade.
Daniel F
6
note: cette version peut dériver. Vous pouvez utiliser enterabs()pour l'éviter. Voici une version non dérivante pour comparaison .
jfs
8
@JavaSa: parce que "faites votre travail" n'est pas instantané et des erreurs time.sleeppeuvent s'accumuler ici. "exécuter toutes les X secondes" et "exécuter avec un retard de ~ X secondes à plusieurs reprises" ne sont pas les mêmes. Voir aussi ce commentaire
jfs
180

Verrouillez votre boucle temporelle sur l'horloge système comme ceci:

import time
starttime = time.time()
while True:
    print "tick"
    time.sleep(60.0 - ((time.time() - starttime) % 60.0))
Dave Rove
la source
22
+1. la vôtre et la twistedréponse sont les seules réponses qui exécutent une fonction toutes les xsecondes. Les autres exécutent la fonction avec un délai de xquelques secondes après chaque appel.
jfs
13
Si vous voulez ajouter du code à cela, ce qui a pris plus d'une seconde ... Cela retarderait le chronométrage et commencerait à prendre du retard. La réponse acceptée dans ce cas est correcte ... Tout le monde peut boucler une simple commande d'impression et le faire tourner toutes les secondes sans délai ...
Angry 84
5
Je préfère à from time import time, sleepcause des implications existentielles;)
Will
14
Fonctionne de manière fantastique. Il n'est pas nécessaire de soustraire votre starttimesi vous commencez par le synchroniser à un certain moment: cela time.sleep(60 - time.time() % 60)a bien fonctionné pour moi. Je l'ai utilisé comme time.sleep(1200 - time.time() % 1200)et il me donne des journaux sur le :00 :20 :40, exactement comme je le voulais.
TemporalWolf
2
@AntonSchigur pour éviter la dérive après plusieurs itérations. Une itération individuelle peut commencer un peu plus tôt ou plus tard selon sleep(), la timer()précision et le temps qu'il faut pour exécuter le corps de la boucle , mais sur des itérations moyennes toujours se produire sur les limites d' intervalle (même si certains sont ignorés): while keep_doing_it(): sleep(interval - timer() % interval). Comparez-le avec celui while keep_doing_it(): sleep(interval)où les erreurs peuvent s'accumuler après plusieurs itérations.
jfs
72

Vous voudrez peut-être considérer Twisted qui est une bibliothèque de mise en réseau Python qui implémente le modèle de réacteur .

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

Alors que "while True: sleep (60)" fonctionnera probablement, Twisted implémente probablement déjà la plupart des fonctionnalités dont vous aurez éventuellement besoin (démonisation, journalisation ou gestion des exceptions comme indiqué par bobince) et sera probablement une solution plus robuste

Aaron Maenpaa
la source
Excellente réponse également, très précise sans dérive. Je me demande si cela met également le processeur en veille en attendant d'exécuter la tâche (alias pas occupé-en attente)?
smoothware
1
cela dérive au niveau de la milliseconde
Derek Eden
1
Que signifie «dérive au niveau de la milliseconde»?
Jean-Paul Calderone
Y a-t-il quand même une rupture de boucle, disons après 10 minutes? @Aaron Maenpaa
alper
67

Si vous voulez un moyen non bloquant d'exécuter périodiquement votre fonction, au lieu d'une boucle infinie bloquante, j'utiliserais un minuteur fileté. De cette façon, votre code peut continuer à fonctionner et effectuer d'autres tâches tout en ayant votre fonction appelée toutes les n secondes. J'utilise beaucoup cette technique pour imprimer des informations de progression sur de longues tâches gourmandes en CPU / disque / réseau.

Voici le code que j'ai publié dans une question similaire, avec les contrôles start () et stop ():

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Usage:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Fonctionnalités:

  • Bibliothèque standard uniquement, pas de dépendances externes
  • start()et stop()sont sûrs d'appeler plusieurs fois même si la minuterie a déjà démarré / arrêté
  • la fonction à appeler peut avoir des arguments positionnels et nommés
  • Vous pouvez changer à intervaltout moment, il sera effectif après la prochaine exécution. Pareil pour args, kwargset même function!
MestreLion
la source
Cette solution semble dériver avec le temps; J'avais besoin d'une version qui vise à appeler la fonction toutes les n secondes sans dérive. Je publierai une mise à jour dans une question distincte.
eraoul
Dans def _run(self)Je suis en train d'envelopper ma tête autour de pourquoi vous appelez self.start()avant self.function(). Peux-tu élaborer? Je pense qu'en appelant en start()premier, il en self.is_runningsera toujours Falseainsi, puis nous ferons toujours tourner un nouveau fil.
Rich Episcopo
1
Je pense que je suis allé au fond des choses. La solution de @ MestreLion exécute une fonction toutes les xsecondes (c.-à-d. T = 0, t = 1x, t = 2x, t = 3x, ...) où, sur les affiches originales, l'exemple de code exécute une fonction avec un intervalle de x seconde entre les deux. De plus, cette solution a, je crois, un bogue si elle intervalest plus courte que le temps nécessaire functionà son exécution. Dans ce cas, self._timersera écrasé dans la startfonction.
Rich Episcopo
Oui, @RichieEpiscopo, l'appel à .function()after .start()est d'exécuter la fonction à t = 0. Et je ne pense pas que ce sera un problème si cela functionprend plus de temps interval, mais oui, il pourrait y avoir des conditions de course sur le code.
MestreLion
C'est le seul moyen non bloquant que j'ai pu obtenir. Merci.
backslashN
35

Je pense que la manière la plus simple est:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

De cette façon, votre code est exécuté, puis il attend 60 secondes puis il s'exécute à nouveau, attend, exécute, etc ... Pas besoin de compliquer les choses: D

Itxaka
la source
Le mot clé True doit être en majuscules
Sean Cain
39
En fait, ce n'est pas la réponse: le temps sleep () ne peut être utilisé que pour attendre X secondes après chaque exécution. Par exemple, si l'exécution de votre fonction prend 0,5 seconde et que vous utilisez time.sleep (1), cela signifie que votre fonction s'exécute toutes les 1,5 secondes, pas 1. Vous devez utiliser d'autres modules et / ou threads pour vous assurer que quelque chose fonctionne pendant Y fois dans chaque X seconde.
kommradHomer
1
@kommradHomer: La réponse de Dave Rove démontre que vous pouvez utiliser time.sleep()exécuter quelque chose toutes les X secondes
jfs
2
À mon avis, le code devrait appeler time.sleep()en while Trueboucle comme:def executeSomething(): print('10 sec left') ; while True: executeSomething(); time.sleep(10)
Leonard Lepadatu
22
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

Si vous souhaitez le faire sans bloquer votre code restant, vous pouvez l'utiliser pour le laisser s'exécuter dans son propre thread:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

Cette solution combine plusieurs fonctionnalités rarement trouvées combinées dans les autres solutions:

  • Gestion des exceptions: Dans la mesure du possible à ce niveau, les exceptions sont gérées correctement, c'est-à-dire se connecter à des fins de débogage sans abandonner notre programme.
  • Pas de chaînage: l' implémentation commune de type chaîne (pour planifier l'événement suivant) que vous trouvez dans de nombreuses réponses est fragile dans la mesure où si quelque chose se passe mal dans le mécanisme de planification ( threading.Timerou autre), cela mettra fin à la chaîne. Aucune autre exécution n'aura lieu alors, même si la raison du problème est déjà corrigée. Une boucle simple et l'attente avec un simple sleep()est beaucoup plus robuste en comparaison.
  • Pas de dérive: ma solution garde une trace exacte des heures auxquelles elle est censée s'exécuter. Il n'y a pas de dérive en fonction du temps d'exécution (comme dans beaucoup d'autres solutions).
  • Ignorer: Ma solution ignorera les tâches si une exécution a pris trop de temps (par exemple, faites X toutes les cinq secondes, mais X a pris 6 secondes). Il s'agit du comportement cron standard (et pour une bonne raison). De nombreuses autres solutions exécutent ensuite la tâche plusieurs fois de suite sans aucun délai. Pour la plupart des cas (par exemple les tâches de nettoyage), cela n'est pas souhaité. S'il est souhaité, il suffit d' utiliser à la next_time += delayplace.
Alfe
la source
2
meilleure réponse pour ne pas dériver.
Sebastian Stark
1
@PirateApp Je ferais cela dans un fil différent. Vous pouvez le faire dans le même fil, mais vous finissez par programmer votre propre système de planification, ce qui est beaucoup trop complexe pour un commentaire.
Alfe
1
En Python, grâce au GIL, l'accès aux variables dans deux threads est parfaitement sûr. Et la simple lecture dans deux threads ne devrait jamais être un problème (pas également dans d'autres environnements threadés). Seule l'écriture à partir de deux threads différents dans un système sans GIL (par exemple en Java, C ++, etc.) nécessite une synchronisation explicite.
Alfe
1
@ user50473 Sans plus d'informations, j'aborderais d'abord la tâche du côté fileté. Un thread lit les données de temps en temps, puis s'endort jusqu'à ce qu'il soit à nouveau temps de le faire. La solution ci-dessus pourrait bien sûr être utilisée pour cela. Mais je pouvais imaginer un tas de raisons pour aller différemment. Alors bonne chance :)
Alfe
1
La mise en veille peut être remplacée par le filetage.Evitez d'attendre avec timeout pour être plus réactif à la sortie de l'application. stackoverflow.com/questions/29082268/…
themadmax
20

Voici une mise à jour du code de MestreLion qui évite les dérives au fil du temps.

La classe RepeatedTimer appelle ici la fonction donnée toutes les "intervalles" secondes comme demandé par l'OP; le calendrier ne dépend pas du temps nécessaire à l'exécution de la fonction. J'aime cette solution car elle n'a pas de dépendances de bibliothèques externes; c'est juste du python pur.

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

Exemple d'utilisation (copié de la réponse de MestreLion):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!
eraoul
la source
5

J'ai fait face à un problème similaire il y a quelque temps. Peut-être que http://cronus.readthedocs.org pourrait vous aider?

Pour la v0.2, l'extrait de code suivant fonctionne

import cronus.beat as beat

beat.set_rate(2) # 2 Hz
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec
Anay
la source
4

La principale différence entre cela et cron est qu'une exception va tuer le démon pour de bon. Vous voudrez peut-être envelopper avec un capteur et un enregistreur d'exceptions.

bobince
la source
4

Une réponse possible:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()
sks
la source
1
C'est occupé à attendre donc un très mauvais.
Alfe
Bonne solution pour quelqu'un qui recherche une minuterie non bloquante.
Noel
3

J'ai fini par utiliser le module de planification . L'API est sympa.

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)
Statistiques d'apprentissage par l'exemple
la source
J'ai du mal à essayer d'utiliser ce module en particulier, j'ai besoin de débloquer le thread principal, j'ai vérifié la FAQ sur le site Web de documentation de la planification, mais je n'ai pas vraiment compris la solution de contournement fournie. Est-ce que quelqu'un sait où je peux trouver un exemple de travail qui ne bloque pas le thread principal?
5Daydreams
1

J'utilise la méthode Tkinter after (), qui ne "vole pas le jeu" (comme le module sched qui a été présenté précédemment), c'est-à-dire qu'elle permet à d'autres choses de fonctionner en parallèle:

import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

do_something1()et do_something2()peut fonctionner en parallèle et à n'importe quelle vitesse d'intervalle. Ici, le 2ème sera exécuté deux fois plus vite. Notez également que j'ai utilisé un simple compteur comme condition pour terminer l'une ou l'autre fonction. Vous pouvez utiliser n'importe quelle autre contition que vous aimez ou aucune si vous avez une fonction à exécuter jusqu'à la fin du programme (par exemple une horloge).

Apostolos
la source
Soyez prudent avec votre formulation: afterne permet pas aux choses de fonctionner en parallèle. Tkinter est un thread unique et ne peut faire qu'une seule chose à la fois. Si quelque chose planifié par afters'exécute, il ne s'exécute pas en parallèle avec le reste du code. Si les deux do_something1et do_something2sont planifiés pour s'exécuter en même temps, ils s'exécuteront séquentiellement, pas en parallèle.
Bryan Oakley
@Apostolos, votre solution consiste uniquement à utiliser la boucle principale tkinter au lieu de planifier la boucle principale, afin qu'elle fonctionne exactement de la même manière mais permet aux interfaces tkinter de continuer à répondre. Si vous n'utilisez pas tkinter pour d'autres choses, cela ne change rien en ce qui concerne la solution Sched. Vous pouvez utiliser deux ou plusieurs fonctions planifiées avec des intervalles différents dans la schedsolution et cela fonctionnera exactement de la même manière que la vôtre.
nosklo
Non, cela ne fonctionne pas de la même manière. J'ai expliqué cela. L'un "verrouille" le programme (c'est-à-dire arrête le flux, vous ne pouvez rien faire d'autre - pas même commencer un autre travail programmé comme vous le suggérez) jusqu'à ce qu'il se termine et l'autre laisse vos mains libres / libres (c'est-à-dire que vous pouvez faire d'autres choses après qu'il a commencé. Vous n'avez pas à attendre jusqu'à ce qu'il se termine. C'est une énorme différence. Si vous aviez essayé la méthode que j'ai présentée, vous l'auriez vu par vous-même. J'ai essayé la vôtre. Pourquoi ne pas Essayez le mien aussi?
Apostolos
1

Voici une version adaptée au code de MestreLion. En plus de la fonction d'origine, ce code:

1) ajouter first_interval utilisé pour déclencher la minuterie à un moment précis (l'appelant doit calculer le premier_intervalle et passer)

2) résoudre une condition de concurrence dans le code d'origine. Dans le code d'origine, si le thread de contrôle n'a pas réussi à annuler le minuteur en cours d'exécution ("Arrêtez le minuteur et annulez l'exécution de l'action du minuteur. Cela ne fonctionnera que si le minuteur est encore dans sa phase d'attente." Cité de https: // docs.python.org/2/library/threading.html ), le minuteur fonctionnera sans fin.

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False
dproc
la source
1

Cela semble beaucoup plus simple que la solution acceptée - y a-t-il des lacunes que je ne considère pas? Je suis venu ici à la recherche de pâtes à la copie simple et j'ai été déçu.

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)

thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()

Qui sort de manière asynchrone.

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018

Cela a une dérive dans le sens où si la tâche en cours d'exécution prend un temps appréciable, alors l'intervalle devient 2 secondes + temps de tâche, donc si vous avez besoin d'une planification précise, ce n'est pas pour vous.

Notez que l' daemon=Trueindicateur signifie que ce thread n'empêchera pas l'application de s'arrêter. Par exemple, avait un problème où pytestse bloquerait indéfiniment après avoir exécuté des tests en attendant que cette tête cesse.

Adam Hughes
la source
Non, il imprime uniquement le premier datetime puis s'arrête ...
Alex Poca
Êtes-vous sûr - je viens de copier et coller dans le terminal. Il revient tout de suite mais l'impression continue en arrière-plan pour moi.
Adam Hughes
Il semble que je manque quelque chose ici. Je copier / coller le code dans test.py , et courir avec test.py python . Avec Python2.7, je dois supprimer daemon = True qui n'est pas reconnu et j'ai lu plusieurs impressions. Avec Python3.8, il s'arrête après la première impression et aucun processus n'est actif après sa fin. Suppression du démon = Vrai J'ai lu plusieurs impressions ...
Alex Poca
hmm étrange - je suis sur python 3.6.10 mais je ne sais pas pourquoi cela importerait
Adam Hughes
Encore une fois: Python3.4.2 (Debian GNU / Linux 8 (jessie)), a dû supprimer daemon = True pour qu'il puisse imprimer plusieurs fois. Avec le démon, j'obtiens une erreur de syntaxe. Les tests précédents avec Python2.7 et 3.8 étaient sur Ubuntu 19.10 Se pourrait-il que le démon soit traité différemment selon l'OS?
Alex Poca
0

J'utilise ceci pour provoquer 60 événements par heure avec la plupart des événements se produisant au même nombre de secondes après la minute entière:

import math
import time
import random

TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging

def set_timing():

    now = time.time()
    elapsed = now - info['begin']
    minutes = math.floor(elapsed/TICK)
    tick_elapsed = now - info['completion_time']
    if (info['tick']+1) > minutes:
        wait = max(0,(TICK_TIMING-(time.time() % TICK)))
        print ('standard wait: %.2f' % wait)
        time.sleep(wait)
    elif tick_elapsed < TICK_MINIMUM:
        wait = TICK_MINIMUM-tick_elapsed
        print ('minimum wait: %.2f' % wait)
        time.sleep(wait)
    else:
        print ('skip set_timing(); no wait')
    drift = ((time.time() - info['begin']) - info['tick']*TICK -
        TICK_TIMING + info['begin']%TICK)
    print ('drift: %.6f' % drift)

info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK

while 1:

    set_timing()

    print('hello world')

    #random real world event
    time.sleep(random.random()*TICK_MINIMUM)

    info['tick'] += 1
    info['completion_time'] = time.time()

Selon les conditions réelles, vous pouvez obtenir des tiques de longueur:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

mais au bout de 60 minutes, vous aurez 60 ticks; et la plupart se produiront avec le décalage correct à la minute que vous préférez.

Sur mon système, j'obtiens une dérive typique de <1 / 20ème de seconde jusqu'à ce que le besoin de correction apparaisse.

L'avantage de cette méthode est la résolution de la dérive d'horloge; ce qui peut entraîner des problèmes si vous faites des choses comme ajouter un élément par tick et que vous vous attendez à 60 éléments ajoutés par heure. Le fait de ne pas tenir compte de la dérive peut amener des indications secondaires comme les moyennes mobiles à considérer les données trop profondément dans le passé, ce qui entraîne une sortie défectueuse.

litepresence
la source
0

par exemple, afficher l'heure locale actuelle

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)
rise.riyo
la source
0
    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()
raviGupta
la source
Sur la base des entrées de l'utilisateur, il itérera cette méthode à chaque intervalle de temps.
raviGupta
0

Voici une autre solution sans utiliser de bibliothèques supplémentaires.

def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
    """Delay using a boolean callable function.

    `condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
    It can break early if condition is met.

    Args:
        condition_fn     - a callable boolean function
        interval_in_sec  - wait time between calling `condition_fn`
        timeout_in_sec   - maximum time to run

    Returns: None
    """
    start = last_call = time.time()
    while time.time() - start < timeout_in_sec:
        if (time.time() - last_call) > interval_in_sec:
            if condition_fn() is True:
                break
            last_call = time.time()
Dat Nguyen
la source