Lecture non bloquante sur un sous-processus.PIPE en python

507

J'utilise le module de sous processus pour démarrer un sous-processus et connecter à son flux de sortie (stdout). Je veux pouvoir exécuter des lectures non bloquantes sur sa sortie standard. Existe-t-il un moyen de rendre .readline non bloquant ou de vérifier s'il y a des données sur le flux avant d'invoquer .readline? Je voudrais que ce soit portable ou au moins fonctionner sous Windows et Linux.

voici comment je le fais pour l'instant (il bloque .readlinesi aucune donnée n'est disponible):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Mathieu Pagé
la source
14
(Venant de google?) Tous les PIPE seront bloqués lorsque l'un des tampons des PIPE sera rempli et non lu. par exemple stdout deadlock lorsque stderr est rempli. Ne passez jamais un TUYAU que vous n'avez pas l'intention de lire.
Nasser Al-Wohaibi
@ NasserAl-Wohaibi cela signifie-t-il qu'il vaut mieux alors toujours créer des fichiers?
Charlie Parker
quelque chose que j'ai été curieux de comprendre, c'est pourquoi son blocage en premier lieu ... Je demande parce que j'ai vu le commentaire:To avoid deadlocks: careful to: add \n to output, flush output, use readline() rather than read()
Charlie Parker
Il est, "par conception", en attente de recevoir des entrées.
Mathieu Pagé
en relation: stackoverflow.com/q/19880190/240515
user240515

Réponses:

403

fcntl, select, asyncprocNe sera pas utile dans ce cas.

Un moyen fiable de lire un flux sans blocage quel que soit le système d'exploitation consiste à utiliser Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line
jfs
la source
6
Oui, cela fonctionne pour moi, j'en ai retiré beaucoup cependant. Il comprend de bonnes pratiques mais pas toujours nécessaires. Python 3.x 2.X compat et close_fds peuvent être omis, cela fonctionnera toujours. Mais soyez conscient de ce que tout fait et ne le copiez pas aveuglément, même si cela fonctionne! (En fait, la solution la plus simple est d'utiliser un thread et de faire une ligne de lecture comme Seb l'a fait, les files d'attente sont juste un moyen facile d'obtenir les données, il y en a d'autres, les threads sont la réponse!)
Aki
3
À l'intérieur du thread, l'appel à out.readlinebloquer le thread et le thread principal, et je dois attendre que readline revienne avant que tout le reste continue. Un moyen simple de contourner cela? (Je lis plusieurs lignes de mon processus, qui est également un autre fichier .py qui fait des bases de données et des choses)
Justin
3
@Justin: 'out.readline' ne bloque pas le thread principal, il est exécuté dans un autre thread.
jfs
4
si je n'arrive pas à arrêter le sous-processus, par exemple. en raison d'exceptions? le thread stdout-reader ne mourra pas et python se bloquera, même si le thread principal est sorti, n'est-ce pas? comment contourner cela? python 2.x ne prend pas en charge la suppression des threads, ce qui est pire, ne prend pas en charge leur interruption. :( (évidemment, il faut gérer les exceptions pour s'assurer que le sous-processus est arrêté, mais au cas où il ne le ferait pas, que pouvez-vous faire?)
n611x007
3
J'ai créé des wrappers amicaux de ceci dans le paquet shelljob pypi.python.org/pypi/shelljob
edA-qa mort-ora-y
77

J'ai souvent eu un problème similaire; Les programmes Python que j'écris doivent souvent avoir la possibilité d'exécuter certaines fonctionnalités principales tout en acceptant simultanément les entrées utilisateur de la ligne de commande (stdin). Le simple fait de mettre la fonctionnalité de gestion des entrées utilisateur dans un autre thread ne résout pas le problème car il se readline()bloque et n'a pas de délai d'expiration. Si la fonctionnalité principale est terminée et qu'il n'est plus nécessaire d'attendre une entrée utilisateur supplémentaire, je veux généralement que mon programme se termine, mais il ne peut pas car readline()il bloque toujours dans l'autre thread en attente d'une ligne. Une solution que j'ai trouvée à ce problème est de faire de stdin un fichier non bloquant en utilisant le module fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

À mon avis, c'est un peu plus propre que d'utiliser les modules de sélection ou de signal pour résoudre ce problème, mais là encore, cela ne fonctionne que sous UNIX ...

Jesse
la source
1
Selon les documents, fcntl () peut recevoir soit un descripteur de fichier, soit un objet qui a la méthode .fileno ().
Denilson Sá Maia
10
La réponse de Jesse n'est pas correcte. Selon Guido, readline ne fonctionne pas correctement avec le mode non bloquant, et il ne le sera pas avant Python 3000. bugs.python.org/issue1175#msg56041 Si vous souhaitez utiliser fcntl pour mettre le fichier en mode non bloquant, vous devez utiliser os.read () de niveau inférieur et séparer les lignes vous-même. Mélanger fcntl avec des appels de haut niveau qui effectuent la mise en mémoire tampon de la ligne pose problème.
anonnn
2
L'utilisation de readline semble incorrecte dans Python 2. Voir la réponse de anonnn stackoverflow.com/questions/375427/…
Catalin Iacob
10
Veuillez ne pas utiliser de boucles occupées. Utilisez poll () avec un délai d'attente pour attendre les données.
Ivo Danihelka
@Stefano qu'est-ce qui est buffer_sizedéfini?
chat
39

Python 3.4 introduit une nouvelle API provisoire pour le asynciomodule d' E / S asynchrone .

L'approche est similaire à la twistedréponse basée sur @Bryan Ward - définissez un protocole et ses méthodes sont appelées dès que les données sont prêtes:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

Voir "Sous-processus" dans la documentation .

Il existe une interface de haut niveau asyncio.create_subprocess_exec()qui renvoie des Processobjets qui permet de lire une ligne de manière asynchrone en utilisant StreamReader.readline()coroutine (avec la syntaxe async/ awaitPython 3.5+ ):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() effectue les tâches suivantes:

  • démarrer le sous-processus, rediriger sa sortie standard vers un tuyau
  • lire une ligne de la sortie standard du sous-processus de manière asynchrone
  • tuer le sous-processus
  • attendez qu'il sorte

Chaque étape peut être limitée par des secondes d'expiration si nécessaire.

jfs
la source
Lorsque j'essaie quelque chose comme ça en utilisant des coroutines python 3.4, je n'obtiens la sortie qu'une fois le script complet exécuté. Je voudrais voir une ligne de sortie imprimée, dès que le sous-processus imprime une ligne. Voici ce que j'ai: pastebin.com/qPssFGep .
flutefreak7
1
@ flutefreak7: les problèmes de mise en mémoire tampon ne sont pas liés à la question actuelle. Suivez le lien pour des solutions possibles.
jfs
Merci! Résolu le problème de mon script en utilisant simplement print(text, flush=True)afin que le texte imprimé soit immédiatement disponible pour l'appelant readline. Lorsque je l'ai testé avec l'exécutable basé sur Fortran que je veux en fait emballer / surveiller, il ne met pas en mémoire tampon sa sortie, donc il se comporte comme prévu.
flutefreak7
Est-il possible de permettre au sous-processus de persister et d'effectuer d'autres opérations de lecture / écriture. readline_and_kill, dans votre deuxième script, fonctionne de manière très similaire subprocess.comunicateà la fin du processus après une opération de lecture / écriture. Je vois également que vous utilisez un seul tuyau stdout, que le sous-processus gère comme non bloquant. J'essaie d'utiliser les deux stdoutet stderr je trouve que je finis par bloquer .
Carel
@Carel le code dans la réponse fonctionne comme prévu comme décrit explicitement dans la réponse. Il est possible d'implémenter d'autres comportements si vous le souhaitez. Les deux canaux sont également non bloquants s'ils sont utilisés, voici un exemple de lecture simultanée à partir des deux canaux .
jfs
19

Essayez le module asyncproc . Par exemple:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

Le module s'occupe de tout le filetage comme suggéré par S.Lott.

Noé
la source
1
Absolument brillant. Beaucoup plus facile que le module de sous-processus brut. Fonctionne parfaitement pour moi sur Ubuntu.
Cerin
12
asyncproc ne fonctionne pas sur Windows et Windows ne prend pas en charge os.WNOHANG :-(
Bryan Oakley
26
asyncproc est GPL, ce qui limite encore son utilisation :-(
Bryan Oakley
Merci. Une petite chose: il semble que remplacer les tabulations par 8 espaces dans asyncproc.py soit la voie à suivre :)
benjaoming
Il ne semble pas que vous puissiez obtenir le code retour du processus que vous avez lancé via le module asyncproc; seule la sortie qu'il a générée.
grayaii
17

Vous pouvez le faire très facilement dans Twisted . En fonction de votre base de code existante, cela peut ne pas être aussi simple à utiliser, mais si vous créez une application tordue, des choses comme celle-ci deviennent presque triviales. Vous créez une ProcessProtocolclasse et remplacez la outReceived()méthode. Twisted (selon le réacteur utilisé) n'est généralement qu'une grosse select()boucle avec des rappels installés pour gérer les données de différents descripteurs de fichiers (souvent des sockets réseau). La outReceived()méthode consiste donc simplement à installer un rappel pour gérer les données provenant de STDOUT. Un exemple simple illustrant ce comportement est le suivant:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

La documentation Twisted contient de bonnes informations à ce sujet.

Si vous construisez l'intégralité de votre application autour de Twisted, cela rend la communication asynchrone avec d'autres processus, locaux ou distants, vraiment élégante comme celle-ci. D'un autre côté, si votre programme n'est pas construit sur Twisted, cela ne sera pas vraiment utile. Espérons que cela puisse être utile à d'autres lecteurs, même si cela ne s'applique pas à votre application particulière.

Bryan Ward
la source
pas bien. selectne devrait pas fonctionner sur les fenêtres avec des descripteurs de fichiers, selon les documents
n611x007
2
@naxa Je ne pense pas que ce à quoi select()il fait référence soit le même que vous. Je suppose cela parce que Twistedfonctionne sur Windows ...
notbad.jpeg
1
«Tordu (selon le réacteur utilisé) n'est généralement qu'une grosse boucle select ()» signifie qu'il y a plusieurs réacteurs à choisir. Celui- select()ci est le plus portable sur unix et unix-like, mais il y a aussi deux réacteurs disponibles pour Windows: twistedmatrix.com/documents/current/core/howto/…
clacke
14

Utilisez select & read (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Pour readline () - comme:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a
Andy Jackson
la source
6
pas bien. selectne devrait pas fonctionner sur les fenêtres avec des descripteurs de fichiers, selon les documents
n611x007
OMG. Lire mégaoctets, ou peut-être gigaoctets un caractère à la fois ... c'est la pire idée que j'ai vue depuis longtemps ... inutile de mentionner, ce code ne fonctionne pas, car proc.stdout.read()peu importe la taille de l'argument un appel bloquant.
wvxvw
OSError: [WinError 10093] Either the application has not called WSAStartup, or WSAStartup failed
nmz787
8

Une solution consiste à créer un autre processus pour effectuer votre lecture du processus, ou à créer un thread du processus avec un délai d'expiration.

Voici la version filetée d'une fonction de temporisation:

http://code.activestate.com/recipes/473878/

Cependant, avez-vous besoin de lire la sortie standard à mesure qu'elle arrive? Une autre solution peut être de vider la sortie dans un fichier et d'attendre que le processus se termine en utilisant p.wait () .

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()
monkut
la source
il semble que le thread de recpie ne sortirait pas après le délai d'expiration et le tuer dépend de pouvoir tuer le sous-processus (sg. sinon sans rapport à cet égard) qu'il lit (une chose que vous devriez pouvoir mais juste au cas où vous ne le pouvez pas ..) .
n611x007
7

Avertissement: cela ne fonctionne que pour la tornade

Pour ce faire, définissez le fd sur non bloquant, puis utilisez ioloop pour enregistrer les rappels. J'ai emballé cela dans un œuf appelé tornado_subprocess et vous pouvez l'installer via PyPI:

easy_install tornado_subprocess

vous pouvez maintenant faire quelque chose comme ceci:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

vous pouvez également l'utiliser avec un RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()
Vukasin Toroman
la source
Merci pour la fonctionnalité intéressante! Juste pour clarifier, pourquoi ne pouvons-nous pas simplement utiliser threading.Threadpour créer de nouveaux processus non bloquants? Je l'ai utilisé dans l' on_messageinstance Websocket de Tornado, et cela a bien fonctionné.
VisioN
1
l'enfilage est surtout déconseillé dans les tornades. ils conviennent parfaitement aux petites fonctions de courte durée. Vous pouvez lire à ce sujet ici: stackoverflow.com/questions/7846323/tornado-web-and-threads github.com/facebook/tornado/wiki/Threading-and-concurrency
Vukasin Toroman
@VukasinToroman, vous m'avez vraiment sauvé ici avec ça. merci beaucoup pour le module tornado_subprocess :)
James Gentes
Est-ce que cela fonctionne sur Windows? (notez que select, avec les descripteurs de fichiers, ne fonctionne pas )
n611x007
Cette bibliothèque n'utilise pas l' selectappel. Je n'ai pas essayé cela sous Windows, mais vous auriez probablement des problèmes car la bibliothèque utilise le fcntlmodule. Donc en bref: non cela ne fonctionnera probablement pas sous Windows.
Vukasin Toroman
6

Les solutions existantes n'ont pas fonctionné pour moi (détails ci-dessous). Ce qui a finalement fonctionné était d'implémenter readline en utilisant read (1) (basé sur cette réponse ). Ce dernier ne bloque pas:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Pourquoi les solutions existantes n'ont pas fonctionné:

  1. Les solutions qui nécessitent readline (y compris celles basées sur la file d'attente) bloquent toujours. Il est difficile (impossible?) De tuer le thread qui exécute readline. Il n'est tué que lorsque le processus qui l'a créé se termine, mais pas lorsque le processus de production de sortie est tué.
  2. Mélanger fcntl de bas niveau avec des appels de ligne de lecture de haut niveau peut ne pas fonctionner correctement comme l'a souligné anonnn.
  3. L'utilisation de select.poll () est soignée, mais ne fonctionne pas sur Windows selon les documents python.
  4. L'utilisation de bibliothèques tierces semble exagérée pour cette tâche et ajoute des dépendances supplémentaires.
Vikram Pudi
la source
1
1. q.get_nowait()de ma réponse ne doit pas bloquer, jamais, c'est le point de l'utiliser. 2. Le thread qui exécute readline ( enqueue_output()fonction ) se termine par exemple sur EOF, y compris le cas où le processus de production de sortie est tué. Si vous croyez que ce n'est pas le cas; veuillez fournir un exemple de code minimal complet qui indique le contraire (peut-être comme une nouvelle question ).
jfs
1
@sebastian J'ai passé une heure ou plus à essayer de trouver un exemple minimal. En fin de compte, je dois accepter que votre réponse traite tous les cas. Je suppose que cela n'a pas fonctionné plus tôt pour moi parce que lorsque j'essayais de tuer le processus de production de sortie, il a déjà été tué et a donné une erreur difficile à déboguer. L'heure a été bien dépensée, car en proposant un exemple minimal, je pourrais trouver une solution plus simple.
Vikram Pudi
Pourriez-vous également publier la solution la plus simple? :) (si c'est différent de celui de Sebastian)
n611x007
@ danger89: Je pense dcmpid = myprocess.
ViFI
En condition après l'appel de read () (juste après alors que True): out ne sera jamais une chaîne vide car vous lisez au moins une chaîne / octets d'une longueur de 1.
sergzach
6

Voici mon code, utilisé pour intercepter chaque sortie du sous-processus ASAP, y compris les lignes partielles. Il pompe en même temps et stdout et stderr dans un ordre presque correct.

Testé et correctement travaillé sur Python 2.7 Linux et Windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()
datacompboy
la source
L'une des rares réponses qui vous permettent de lire des trucs qui ne se terminent pas nécessairement par une nouvelle ligne.
totaam
5

J'ajoute ce problème pour lire certains sous-processus. Ouvrir la sortie standard. Voici ma solution de lecture non bloquante:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Sébastien Claeys
la source
5
fcntl ne fonctionne pas sur Windows, selon la documentation .
n611x007
@anatolytechtonik utiliser à la msvcrt.kbhit()place
chat
4

Cette version de lecture non bloquante ne nécessite pas de modules spéciaux et fonctionnera prête à l'emploi sur la majorité des distributions Linux.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())
Tom Lime
la source
3

Voici une solution simple basée sur des threads qui:

  • fonctionne sur Linux et Windows (sans compter sur select ).
  • lit les deux stdout et de stderrmanière asynchrone.
  • ne repose pas sur une interrogation active avec un temps d'attente arbitraire (compatible CPU).
  • n'utilise pas asyncio(ce qui peut entrer en conflit avec d'autres bibliothèques).
  • s'exécute jusqu'à la fin du processus enfant.

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()
Olivier Michel
la source
2

L'ajout de cette réponse ici car il offre la possibilité de définir des canaux non bloquants sur Windows et Unix.

Tous les ctypesdétails sont grâce à la réponse de @ techtonik .

Il existe une version légèrement modifiée à utiliser à la fois sur les systèmes Unix et Windows.

  • Compatible Python3 (seule une modification mineure est nécessaire) .
  • Inclut la version posix et définit l'exception à utiliser pour l'un ou l'autre.

De cette façon, vous pouvez utiliser la même fonction et exception pour le code Unix et Windows.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: /programming/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

Pour éviter de lire des données incomplètes, j'ai fini par écrire mon propre générateur de ligne de lecture (qui renvoie la chaîne d'octets pour chaque ligne).

C'est un générateur pour que vous puissiez par exemple ...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)
ideasman42
la source
(1) ce commentaire indique que readline()cela ne fonctionne pas avec les canaux non bloquants (tels que set using fcntl) sur Python 2 - pensez-vous que ce n'est plus correct? (ma réponse contient le lien ( fcntl) qui fournit les mêmes informations mais il semble supprimé maintenant). (2) Voir comment les multiprocessing.connection.PipeutilisationsSetNamedPipeHandleState
jfs
Je n'ai testé cela que sur Python3. Mais vu cette information aussi et attendez-vous à ce qu'elle reste valide. J'ai également écrit mon propre code pour utiliser à la place de readline, j'ai mis à jour ma réponse pour l'inclure.
ideasman42
2

J'ai le problème de l'interrogateur d'origine, mais je ne souhaite pas invoquer de threads. J'ai mélangé la solution de Jesse avec une lecture directe () du tube et mon propre gestionnaire de tampons pour les lectures de ligne (cependant, mon sous-processus - ping - écrivait toujours des lignes complètes <une taille de page système). J'évite l'attente occupée en lisant uniquement dans une montre io enregistrée sur un objet. Ces jours-ci, j'exécute généralement du code dans un gobject MainLoop pour éviter les threads.

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

L'observateur est

def watch(f, *other):
print 'reading',f.read()
return True

Et le programme principal configure un ping puis appelle la boucle de messagerie gobject.

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

Tout autre travail est attaché aux rappels dans gobject.

Dave Kitchen
la source
2

Les choses vont beaucoup mieux en Python moderne.

Voici un programme enfant simple, "hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

Et un programme pour interagir avec lui:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

Cela imprime:

b'hello bob\n'
b'hello alice\n'

Notez que le modèle réel, qui se trouve également dans presque toutes les réponses précédentes, ici et dans les questions connexes, consiste à définir le descripteur de fichier stdout de l'enfant sur non bloquant, puis à l'interroger dans une sorte de boucle de sélection. De nos jours, bien sûr, cette boucle est fournie par asyncio.

user240515
la source
1

La sélection module de vous aide à déterminer où se trouve la prochaine entrée utile.

Cependant, vous êtes presque toujours plus heureux avec des threads séparés. On fait un blocage lire la stdin, un autre fait où que vous soyez, vous ne voulez pas bloqué.

S.Lott
la source
11
Je pense que cette réponse est inutile pour deux raisons: (a) Le module de sélection ne fonctionnera pas sur les tuyaux sous Windows (comme le lien fourni l'indique clairement), ce qui contredit les intentions de l'OP d'avoir une solution portable. (b) Les threads asynchrones ne permettent pas un dialogue synchrone entre le processus parent et le processus enfant. Que faire si le processus parent souhaite envoyer la prochaine action en fonction de la ligne suivante lue à l'enfant?!
ThomasH
4
select n'est également pas utile dans la mesure où les lectures de Python seront bloquées même après la sélection, car il n'a pas de sémantique C standard et ne renverra pas de données partielles.
Helmut Grohne
Une thresd séparée pour la lecture de la sortie de l'enfant a résolu mon problème qui était similaire à cela. Si vous avez besoin d'une interaction syncrone, je suppose que vous ne pouvez pas utiliser cette solution (sauf si vous savez à quoi vous attendre). J'aurais accepté cette réponse
Emiliano
1

pourquoi déranger thread & queue? contrairement à readline (), BufferedReader.read1 () ne bloquera pas l'attente de \ r \ n, il retourne ASAP s'il y a une sortie qui arrive.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()
mfmain
la source
Reviendra-t-il dès que possible s'il n'y a rien qui arrive? Si ce n'est pas le cas, il bloque.
Mathieu Pagé
@ MathieuPagé a raison. read1bloquera si le premier bloc de lecture sous-jacent se produit lorsque le canal est toujours ouvert mais qu'aucune entrée n'est disponible.
Jack O'Connor
1

Dans mon cas, j'avais besoin d'un module de journalisation qui capture la sortie des applications d'arrière-plan et l'augmente (en ajoutant des horodatages, des couleurs, etc.).

Je me suis retrouvé avec un thread d'arrière-plan qui fait les E / S réelles. Le code suivant est uniquement pour les plates-formes POSIX. J'ai enlevé les pièces non essentielles.

Si quelqu'un va utiliser cette bête pendant de longues périodes, envisagez de gérer des descripteurs ouverts. Dans mon cas, ce n'était pas un gros problème.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)
Dmytro
la source
1

Mon problème est un peu différent car je voulais collecter à la fois stdout et stderr à partir d'un processus en cours d'exécution, mais finalement le même puisque je voulais rendre la sortie dans un widget comme son généré.

Je ne voulais pas recourir à la plupart des solutions de contournement proposées en utilisant des files d'attente ou des threads supplémentaires car ils ne devraient pas être nécessaires pour effectuer une tâche aussi courante que l'exécution d'un autre script et la collecte de sa sortie.

Après avoir lu les solutions proposées et les documents python, j'ai résolu mon problème avec l'implémentation ci-dessous. Oui, cela ne fonctionne que pour POSIX car j'utilise l' selectappel de fonction.

Je suis d'accord que les documents sont déroutants et l'implémentation est maladroite pour une telle tâche de script courante. Je crois que les anciennes versions de python ont des valeurs par défaut Popenet des explications différentes, ce qui a créé beaucoup de confusion. Cela semble bien fonctionner pour Python 2.7.12 et 3.5.2.

La clé était de définir la bufsize=1mise en mémoire tampon des lignes, puis universal_newlines=Truede traiter comme un fichier texte au lieu d'un binaire qui semble devenir la valeur par défaut lors de la configuration bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG et VERBOSE sont simplement des macros qui impriment la sortie sur le terminal.

Cette solution est IMHO efficace à 99,99% car elle utilise toujours la readlinefonction de blocage , nous supposons donc que le sous-processus est agréable et génère des lignes complètes.

Je me réjouis des commentaires pour améliorer la solution car je suis encore nouveau sur Python.

Brooke Wallace
la source
Dans ce cas particulier, vous pouvez définir stderr = subprocess.STDOUT dans le constructeur Popen et obtenir toutes les sorties de cmd.stdout.readline ().
Aaron
Bel exemple clair. A eu des problèmes avec select.select () mais cela l'a résolu pour moi.
maharvey67
0

À partir de la réponse de JF Sebastian et de plusieurs autres sources, j'ai mis en place un simple gestionnaire de sous-processus. Il fournit la lecture non bloquante de la requête, ainsi que l'exécution de plusieurs processus en parallèle. Il n'utilise aucun appel spécifique au système d'exploitation (que je sache) et devrait donc fonctionner n'importe où.

Il est disponible sur pypi, donc juste pip install shelljob. Reportez-vous à la page du projet pour des exemples et des documents complets.

edA-qa mort-ora-y
la source
0

EDIT: Cette implémentation bloque toujours. Utilisez plutôt la réponse de JFSebastian .

J'ai essayé la meilleure réponse , mais le risque supplémentaire et la maintenance du code de thread étaient inquiétants.

En parcourant le module io (et étant limité à 2.6), j'ai trouvé BufferedReader. Ceci est ma solution sans fil et non bloquante.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line
romc
la source
as-tu essayé for line in iter(p.stdout.readline, ""): # do stuff with the line? Il est sans thread (thread unique) et bloque lorsque votre code bloque.
jfs
@ jf-sebastian Ouais, je suis finalement revenu à ta réponse. Ma mise en œuvre est encore parfois bloquée. Je vais modifier ma réponse pour avertir les autres de ne pas emprunter cette voie.
romc
0

Je suis récemment tombé sur le même problème que je dois lire une ligne à la fois à partir du flux (exécution en queue dans le sous-processus) en mode non bloquant, je voulais éviter les problèmes suivants: ne pas graver cpu, ne pas lire le flux d'un octet ( comme readline), etc.

Voici mon implémentation https://gist.github.com/grubberr/5501e1a9760c3eab5e0a il ne prend pas en charge les fenêtres (sondage), ne gère pas EOF, mais cela fonctionne bien pour moi

grubberr
la source
la réponse à base de fil ne pas brûler cpu (vous pouvez spécifier arbitraire timeoutcomme dans votre solution) et .readline()lit plus d'un octet à la fois ( bufsize=1moyen ligne -buffered (uniquement pour l' écriture)). Quels autres problèmes avez-vous trouvés? Les réponses en lien uniquement ne sont pas très utiles.
jfs
0

Ceci est un exemple pour exécuter une commande interactive dans un sous-processus, et la sortie standard est interactive à l'aide d'un pseudo-terminal. Vous pouvez vous référer à: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Liao
la source
0

Cette solution utilise le selectmodule pour «lire toutes les données disponibles» à partir d'un flux d'E / S. Cette fonction se bloque initialement jusqu'à ce que les données soient disponibles, mais ne lit ensuite que les données disponibles et ne bloque plus.

Étant donné qu'il utilise le selectmodule, cela ne fonctionne que sur Unix.

Le code est entièrement conforme à PEP8.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer
Bradley Odell
la source
0

J'ai également fait face au problème décrit par Jesse et l' ai résolu en utilisant "select" comme Bradley , Andy et d'autres l'ont fait, mais en mode de blocage pour éviter une boucle occupée. Il utilise un tuyau factice comme un faux stdin. Sélectionnez les blocs et attendez que stdin ou le tuyau soit prêt. Lorsqu'une touche est enfoncée, stdin débloque la sélection et la valeur de la clé peut être récupérée avec read (1). Lorsqu'un thread différent écrit sur le tuyau, le tuyau débloque le sélecteur et peut être considéré comme une indication que le besoin de stdin est terminé. Voici un code de référence:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()
gonzaedu61
la source
REMARQUE: pour que cela fonctionne sous Windows, le tuyau doit être remplacé par une douille. Je ne l'ai pas encore essayé mais cela devrait fonctionner selon la documentation.
gonzaedu61
0

Essayez wexpect , qui est l'alternative Windows de pexpect .

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()
betontalpfa
la source
0

Sur les systèmes de type Unix et Python 3.5+, il y a os.set_blockingce qui fait exactement ce qu'il dit.

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

Cela produit:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

Avec os.set_blockingcommenté c'est:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
saaj
la source
-2

Voici un module qui prend en charge les lectures non bloquantes et les écritures d'arrière-plan en python:

https://pypi.python.org/pypi/python-nonblock

Fournit une fonction,

nonblock_read qui lira les données du flux, si disponible, sinon retourne une chaîne vide (ou None si le flux est fermé de l'autre côté et que toutes les données possibles ont été lues)

Vous pouvez également envisager le module python-subprocess2,

https://pypi.python.org/pypi/python-subprocess2

ce qui ajoute au module de sous-processus. Ainsi, sur l'objet renvoyé par "subprocess.Popen" est ajoutée une méthode supplémentaire, runInBackground. Cela démarre un thread et retourne un objet qui sera automatiquement rempli au fur et à mesure que les trucs sont écrits dans stdout / stderr, sans bloquer votre thread principal.

Prendre plaisir!

Tim Savannah
la source
J'aimerais essayer ce module non bloquant , mais je suis relativement nouveau dans certaines des procédures Linux. Exactement comment installer ces routines? J'utilise Raspbian Jessie, une version de Debian Linux pour Raspberry Pi. J'ai essayé 'sudo apt-get install nonblock' et python-nonblock et les deux ont lancé une erreur - introuvable. J'ai téléchargé le fichier zip sur ce site pypi.python.org/pypi/python-nonblock , mais je ne sais pas quoi en faire. Merci .... RDK
RDK