J'ai un script qui effectue avec succès un ensemble de tâches de pool multitraitement avec un imap_unordered()
appel:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
Cependant, mon num_tasks
est d'environ 250000, et donc le join()
verrouillage du thread principal pendant environ 10 secondes, et j'aimerais pouvoir faire écho à la ligne de commande de manière incrémentielle pour montrer que le processus principal n'est pas verrouillé. Quelque chose comme:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)
Existe-t-il une méthode pour l'objet de résultat ou le pool lui-même qui indique le nombre de tâches restantes? J'ai essayé d'utiliser un multiprocessing.Value
objet comme compteur ( do_work
appelle une counter.value += 1
action après avoir effectué sa tâche), mais le compteur n'atteint que ~ 85% de la valeur totale avant d'arrêter l'incrémentation.
la source
def do_word(*a): time.sleep(.1)
comme exemple. Si cela ne fonctionne pas pour vous, créez un exemple de code minimal complet qui illustre votre problème: décrivez à l'aide de mots ce que vous vous attendez à ce qu'il se passe et ce qui se passe à la place, mentionnez comment exécutez votre script Python, quel est votre système d'exploitation, la version Python et postez-le comme une nouvelle question .Pool.map()
. Je ne m'en suis pas rendu compte seulementimap()
et jeimap_unordered()
travaille de cette façon - la documentation dit simplement "Une version plus paresseuse de map ()" mais signifie en réalité "l'itérateur sous-jacent renvoie les résultats au fur et à mesure qu'ils arrivent".imap_unordered()
. Le problème de Hanan est probablement dû àsys.stderr.write('\r..')
(écraser la même ligne pour montrer la progression).Mon préféré - vous donne une jolie petite barre de progression et une ETA d'achèvement pendant que les choses se déroulent et s'engagent en parallèle.
la source
pip install tqdm
J'ai constaté que le travail était déjà fait au moment où j'ai essayé de vérifier sa progression. C'est ce qui a fonctionné pour moi en utilisant tqdm .
pip install tqdm
Cela devrait fonctionner avec tous les types de multitraitement, qu'ils bloquent ou non.
la source
Trouvé une réponse moi - même avec un peu plus creuser: Jeter un oeil à la
__dict__
de l'imap_unordered
objet résultat, je l' ai trouvé a un_index
attribut qui , chaque pas l' exécution des tâches. Donc, cela fonctionne pour la journalisation, enveloppé dans lawhile
boucle:Cependant, j'ai trouvé que le fait d'échanger le
imap_unordered
pour unmap_async
aboutissait à une exécution beaucoup plus rapide, bien que l'objet de résultat soit un peu différent. Au lieu de cela, l'objet de résultat demap_async
a un_number_left
attribut et uneready()
méthode:la source
rs
n'est pas connu et qu'il est un peu tard ou pas?rs
a déjà lancé les autres threads.rs
dans aucune boucle, je suis un débutant multiprocesseur et cela aiderait. Merci beaucoup.python 3.5
, la solution utilisant_number_left
ne fonctionne pas._number_left
représente les morceaux qui restent à traiter. Par exemple, si je veux que 50 éléments soient passés à ma fonction en parallèle, alors pour un pool de threads avec 3 processus_map_async()
crée 10 morceaux avec 5 éléments chacun._number_left
représente alors combien de ces morceaux ont été achevés.Je sais que c'est une question assez ancienne, mais voici ce que je fais quand je veux suivre la progression d'un pool de tâches en python.
Fondamentalement, vous utilisez apply_async avec un callbak (dans ce cas, il s'agit d'ajouter la valeur renvoyée à une liste), vous n'avez donc pas à attendre pour faire autre chose. Ensuite, dans une boucle while, vous vérifiez la progression du travail. Dans ce cas, j'ai ajouté un widget pour le rendre plus joli.
Le résultat:
J'espère que ça aide.
la source
[pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
pour(pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args)
Comme suggéré par Tim, vous pouvez utiliser
tqdm
etimap
pour résoudre ce problème. Je viens de tomber sur ce problème et de peaufiner laimap_unordered
solution, afin que je puisse accéder aux résultats du mappage. Voici comment ça fonctionne:Si vous ne vous souciez pas des valeurs renvoyées par vos travaux, vous n'avez pas besoin d'affecter la liste à une variable.
la source
pour tous ceux qui recherchent une solution simple fonctionnant avec
Pool.apply_async()
:la source
J'ai créé une classe personnalisée pour créer une impression de progression. Maby cela aide:
la source
Essayez cette approche simple basée sur les files d'attente, qui peut également être utilisée avec la mise en commun. N'oubliez pas que l'impression de quoi que ce soit après le lancement de la barre de progression entraînera son déplacement, au moins pour cette barre de progression particulière. (Progrès de PyPI 1.5)
la source