multitraitement: comment partager un dict entre plusieurs processus?

113

Un programme qui crée plusieurs processus qui fonctionnent sur une file d'attente pouvant être jointes Qet qui peut éventuellement manipuler un dictionnaire global Dpour stocker les résultats. (ainsi chaque processus enfant peut utiliser Dpour stocker son résultat et voir également quels résultats les autres processus enfants produisent)

Si j'imprime le dictionnaire D dans un processus fils, je vois les modifications qui y ont été apportées (ie sur D). Mais après que le processus principal rejoint Q, si j'imprime D, c'est un dict vide!

Je comprends que c'est un problème de synchronisation / verrouillage. Quelqu'un peut-il me dire ce qui se passe ici et comment je peux synchroniser l'accès à D?

dop
la source
1
Cela ne fonctionne pas comme prévu au moins sur python 3.7.2 utilisant osx 10.14.4 Dict n'est pas synchronisé et son contenu est réécrit par d'autres processus. Cependant, <code> multiprocessing.Manager (). List () </code> fonctionne comme prévu.
Andrew Druchenko

Réponses:

162

Une réponse générale implique l'utilisation d'un Managerobjet. Adapté de la documentation:

from multiprocessing import Process, Manager

def f(d):
    d[1] += '1'
    d['2'] += 2

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    d[1] = '1'
    d['2'] = 2

    p1 = Process(target=f, args=(d,))
    p2 = Process(target=f, args=(d,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print d

Production:

$ python mul.py 
{1: '111', '2': 6}
expéditeur
la source
4
Merci senderle. En effet, D = multiprocessing.Manager (). Dict () résout mon problème. J'utilisais D = dict ().
dop le
3
@LorenzoBelli, si vous demandez si l'accès au gestionnaire est synchronisé, je pense que la réponse est oui. multiprocessing.Manager()renvoie une instance deSyncManager , dont le nom le suggère!
senderle
@senderle Je souhaite partager l'état aléatoire numpy d'un processus parent avec un processus enfant. J'ai essayé d'utiliser Managermais toujours pas de chance. Pourriez-vous s'il vous plaît jeter un oeil à ma question ici et voir si vous pouvez proposer une solution? Je peux toujours obtenir différents nombres aléatoires si je le fais à np.random.seed(None)chaque fois que je génère un nombre aléatoire, mais cela ne me permet pas d'utiliser l'état aléatoire du processus parent, ce qui n'est pas ce que je veux. Toute aide est grandement appréciée.
Amir
1
@RadioControlled est heureux d'écrire une mise à jour, mais brièvement, bien que je ne pense pas que vous puissiez faire cela directement, vous pouvez facilement créer un nouveau dict géré avec les mêmes clés et valeurs, et l'utiliser à la place de l'original. Cela convient-il à votre cas?
expéditeur le
1
@senderle, c'est ce que j'ai fini par faire. La réponse serait donc que vous deviez faire exactement cela.
Radiocommandé le
25

le multitraitement n'est pas comme le threading. Chaque processus enfant recevra une copie de la mémoire du processus principal. Généralement, l'état est partagé via la communication (canaux / sockets), les signaux ou la mémoire partagée.

Le multitraitement rend certaines abstractions disponibles pour votre cas d'utilisation - état partagé qui est traité comme local par l'utilisation de proxies ou de mémoire partagée: http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes

Sections pertinentes:

Jeremy Brown
la source
1
Merci beaucoup. Vous m'avez conduit à la solution / a: multiprocessing.Manager (). Dict ().
dop le
Quelqu'un peut-il expliquer ce que signifie l'énoncé «Chaque processus enfant recevra une copie de la mémoire du processus principal».
Itsme2003
@ Itsme2003 par défaut, un processus généré n'a pas accès à la mémoire du processus parent (c'est l'une des principales différences avec les threads). Ainsi, lorsqu'un processus a besoin d'un objet du processus parent, il doit en créer une copie (au lieu d'obtenir une référence à l'objet réel). La réponse ci-dessus explique comment partager des objets entre les processus.
Niklas Mertsch
Parce que c'est souvent une erreur: tant que vous ne modifiez pas l'objet, au moins dans la configuration Linux habituelle, l'objet ne sera en fait stocké qu'une seule fois dans la mémoire. Il sera copié dès qu'il sera modifié. Cela peut être très important si vous devez économiser de la mémoire et ne pas modifier l'objet.
Radiocommandé le
16

J'aimerais partager mon propre travail qui est plus rapide que le dict de Manager et qui est plus simple et plus stable que la bibliothèque pyshmht qui utilise des tonnes de mémoire et ne fonctionne pas pour Mac OS. Bien que mon dict ne fonctionne que pour les chaînes simples et est actuellement immuable. J'utilise l'implémentation du sondage linéaire et stocke les paires de clés et de valeurs dans un bloc de mémoire séparé après la table.

from mmap import mmap
import struct
from timeit import default_timer
from multiprocessing import Manager
from pyshmht import HashTable


class shared_immutable_dict:
    def __init__(self, a):
        self.hs = 1 << (len(a) * 3).bit_length()
        kvp = self.hs * 4
        ht = [0xffffffff] * self.hs
        kvl = []
        for k, v in a.iteritems():
            h = self.hash(k)
            while ht[h] != 0xffffffff:
                h = (h + 1) & (self.hs - 1)
            ht[h] = kvp
            kvp += self.kvlen(k) + self.kvlen(v)
            kvl.append(k)
            kvl.append(v)

        self.m = mmap(-1, kvp)
        for p in ht:
            self.m.write(uint_format.pack(p))
        for x in kvl:
            if len(x) <= 0x7f:
                self.m.write_byte(chr(len(x)))
            else:
                self.m.write(uint_format.pack(0x80000000 + len(x)))
            self.m.write(x)

    def hash(self, k):
        h = hash(k)
        h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1)
        return h

    def get(self, k, d=None):
        h = self.hash(k)
        while True:
            x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0]
            if x == 0xffffffff:
                return d
            self.m.seek(x)
            if k == self.read_kv():
                return self.read_kv()
            h = (h + 1) & (self.hs - 1)

    def read_kv(self):
        sz = ord(self.m.read_byte())
        if sz & 0x80:
            sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000
        return self.m.read(sz)

    def kvlen(self, k):
        return len(k) + (1 if len(k) <= 0x7f else 4)

    def __contains__(self, k):
        return self.get(k, None) is not None

    def close(self):
        self.m.close()

uint_format = struct.Struct('>I')


def uget(a, k, d=None):
    return to_unicode(a.get(to_str(k), d))


def uin(a, k):
    return to_str(k) in a


def to_unicode(s):
    return s.decode('utf-8') if isinstance(s, str) else s


def to_str(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s


def mmap_test():
    n = 1000000
    d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time))


def manager_test():
    n = 100000
    d = Manager().dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time))


def shm_test():
    n = 1000000
    d = HashTable('tmp', n)
    d.update({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time))


if __name__ == '__main__':
    mmap_test()
    manager_test()
    shm_test()

Sur mon ordinateur portable, les performances sont les suivantes:

mmap speed: 247288 gets per sec
manager speed: 33792 gets per sec
shm speed: 691332 gets per sec

exemple d'utilisation simple:

ht = shared_immutable_dict({'a': '1', 'b': '2'})
print ht.get('a')
alyaxey
la source
14
Github? Documentation? comment utiliser cet outil?
Pavlos Panteliadis
10

En plus de @ senderle ici, certains pourraient également se demander comment utiliser la fonctionnalité de multiprocessing.Pool.

La bonne chose est qu'il existe une .Pool()méthode pour l' managerinstance qui imite toute l'API familière du niveau supérieur multiprocessing.

from itertools import repeat
import multiprocessing as mp
import os
import pprint

def f(d: dict) -> None:
    pid = os.getpid()
    d[pid] = "Hi, I was written by process %d" % pid

if __name__ == '__main__':
    with mp.Manager() as manager:
        d = manager.dict()
        with manager.Pool() as pool:
            pool.map(f, repeat(d, 10))
        # `d` is a DictProxy object that can be converted to dict
        pprint.pprint(dict(d))

Production:

$ python3 mul.py 
{22562: 'Hi, I was written by process 22562',
 22563: 'Hi, I was written by process 22563',
 22564: 'Hi, I was written by process 22564',
 22565: 'Hi, I was written by process 22565',
 22566: 'Hi, I was written by process 22566',
 22567: 'Hi, I was written by process 22567',
 22568: 'Hi, I was written by process 22568',
 22569: 'Hi, I was written by process 22569',
 22570: 'Hi, I was written by process 22570',
 22571: 'Hi, I was written by process 22571'}

Il s'agit d'un exemple légèrement différent où chaque processus enregistre simplement son ID de processus dans l' DictProxyobjet global d.

Brad Solomon
la source
3

Peut-être que tu peux essayer pyshmht , l'extension de table de hachage basée sur la mémoire pour Python.

Remarquer

  1. Ce n'est pas entièrement testé, juste pour votre référence.

  2. Il manque actuellement de mécanismes de verrouillage / sem pour le multitraitement.

felix021
la source