Objets de mémoire partagée en multitraitement

124

Supposons que j'ai un grand tableau numpy en mémoire, j'ai une fonction funcqui prend ce tableau géant en entrée (avec quelques autres paramètres). funcavec différents paramètres peuvent être exécutés en parallèle. Par exemple:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

Si j'utilise une bibliothèque multitraitement, ce tableau géant sera copié plusieurs fois dans différents processus.

Existe-t-il un moyen de laisser différents processus partager le même tableau? Cet objet tableau est en lecture seule et ne sera jamais modifié.

Ce qui est plus compliqué, si arr n'est pas un tableau, mais un objet Python arbitraire, y a-t-il un moyen de le partager?

[ÉDITÉ]

J'ai lu la réponse mais je suis encore un peu confus. Puisque fork () est une copie sur écriture, nous ne devrions pas invoquer de coût supplémentaire lors de la création de nouveaux processus dans la bibliothèque multiprocesseur python. Mais le code suivant suggère qu'il y a une surcharge énorme:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

sortie (et au fait, le coût augmente à mesure que la taille du tableau augmente, donc je soupçonne qu'il y a encore des frais généraux liés à la copie de mémoire):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Pourquoi y a-t-il une telle surcharge si nous ne copions pas le tableau? Et quelle part la mémoire partagée me sauve-t-elle?

Vengeance
la source
Vous avez regardé la documentation , non?
Lev Levitsky
@FrancisAvila existe-t-il un moyen de partager non seulement des tableaux, mais des objets Python arbitraires?
Vendetta
1
@LevLevitsky Je dois demander, y a-t-il un moyen de partager non seulement des tableaux, mais des objets Python arbitraires?
Vendetta
2
Cette réponse explique bien pourquoi les objets Python arbitraires ne peuvent pas être partagés.
Janne Karila

Réponses:

121

Si vous utilisez un système d'exploitation qui utilise la fork()sémantique de copie sur écriture (comme n'importe quel Unix commun), tant que vous ne modifiez jamais votre structure de données, il sera disponible pour tous les processus enfants sans prendre de mémoire supplémentaire. Vous n'aurez rien à faire de spécial (sauf assurez-vous de ne pas modifier l'objet).

La chose la plus efficace que vous puissiez faire pour résoudre votre problème serait de regrouper votre tableau dans une structure de tableau efficace (en utilisant numpyou array), de le placer dans la mémoire partagée, de l'envelopper multiprocessing.Arrayet de le transmettre à vos fonctions. Cette réponse montre comment faire cela .

Si vous voulez un objet partagé inscriptible , vous devrez l'envelopper avec une sorte de synchronisation ou de verrouillage. multiprocessingfournit deux méthodes pour ce faire : l'une utilisant la mémoire partagée (adaptée pour des valeurs simples, des tableaux ou des ctypes) ou un Managerproxy, où un processus détient la mémoire et un gestionnaire arbitre l'accès à celle-ci à partir d'autres processus (même sur un réseau).

L' Managerapproche peut être utilisée avec des objets Python arbitraires, mais sera plus lente que l'équivalent utilisant la mémoire partagée car les objets doivent être sérialisés / désérialisés et envoyés entre les processus.

Il existe une multitude de bibliothèques et d'approches de traitement parallèle disponibles en Python . multiprocessingest une bibliothèque excellente et bien équilibrée, mais si vous avez des besoins spéciaux, l'une des autres approches peut être meilleure.

Francis Avila
la source
25
Juste pour noter, sur Python, fork () signifie en fait copie lors de l'accès (car le simple accès à l'objet changera son nombre de références).
Fabio Zadrozny
3
@FabioZadrozny Copierait-il réellement l'objet entier, ou juste la page mémoire contenant son refcount?
zigg
5
AFAIK, seule la page mémoire contenant le refcount (donc, 4 ko sur chaque accès objet).
Fabio Zadrozny
1
@max Utilisez une fermeture. La fonction donnée à apply_asyncdoit référencer l'objet partagé dans la portée directement plutôt que via ses arguments.
Francis Avila
3
@FrancisAvila comment utilisez-vous une fermeture? La fonction que vous donnez à apply_async ne devrait-elle pas être sélectionnable? Ou ce n'est qu'une restriction map_async?
GermanK
17

J'ai rencontré le même problème et j'ai écrit une petite classe d'utilitaire de mémoire partagée pour le contourner.

J'utilise multiprocessing.RawArray(lockfree), et l'accès aux tableaux n'est pas du tout synchronisé (lockfree), faites attention de ne pas tirer vos propres pieds.

Avec la solution, j'obtiens des accélérations d'un facteur d'environ 3 sur un i7 quad-core.

Voici le code: n'hésitez pas à l'utiliser et à l'améliorer, et veuillez signaler tout bogue.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))
martin.preinfalk
la source
Je viens de réaliser que vous devez configurer vos tableaux de mémoire partagée avant de créer le pool de multitraitement, je ne sais pas encore pourquoi mais cela ne fonctionnera certainement pas dans l'autre sens.
martin.preinfalk
la raison en est que le pool de multitraitement appelle fork () lorsque le pool est instancié, donc tout ce qui suit n'aura pas accès au pointeur vers un mem partagé créé par la suite.
Xiv
Quand j'ai essayé ce code sous py35, j'ai eu une exception dans multiprocessing.sharedctypes.py, donc je suppose que ce code est pour py2 uniquement.
Dr.Hillier Dániel
11

C'est le cas d'utilisation prévu pour Ray , qui est une bibliothèque pour Python parallèle et distribué. Sous le capot, il sérialise les objets à l'aide de la disposition des données Apache Arrow (qui est un format sans copie) et les stocke dans un magasin d'objets à mémoire partagée afin qu'ils soient accessibles par plusieurs processus sans créer de copies.

Le code ressemblerait à ce qui suit.

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

Si vous n'appelez pas, ray.putle tableau sera toujours stocké dans la mémoire partagée, mais cela sera fait une fois par appel de func, ce qui n'est pas ce que vous voulez.

Notez que cela fonctionnera non seulement pour les tableaux mais aussi pour les objets qui contiennent des tableaux , par exemple, les dictionnaires mappant les entrées aux tableaux comme ci-dessous.

Vous pouvez comparer les performances de la sérialisation dans Ray par rapport à pickle en exécutant ce qui suit dans IPython.

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

La sérialisation avec Ray n'est que légèrement plus rapide que pickle, mais la désérialisation est 1000x plus rapide en raison de l'utilisation de la mémoire partagée (ce nombre dépendra bien sûr de l'objet).

Consultez la documentation Ray . Vous pouvez en savoir plus sur la sérialisation rapide à l'aide de Ray et Arrow . Notez que je suis l'un des développeurs Ray.

Robert Nishihara
la source
1
Ray sonne bien! Mais, j'ai déjà essayé d'utiliser cette bibliothèque, mais malheureusement, je viens de réaliser que Ray ne prend pas en charge Windows. J'espère que vous pourrez prendre en charge Windows dès que possible. Merci aux développeurs!
Hzzkygcs
6

Comme Robert Nishihara l'a mentionné, Apache Arrow rend cela facile, en particulier avec le magasin d'objets en mémoire Plasma, sur lequel Ray est construit.

J'ai créé du plasma cérébral spécifiquement pour cette raison: chargement et rechargement rapides de gros objets dans une application Flask. Il s'agit d'un espace de noms d'objets à mémoire partagée pour les objets sérialisables Apache Arrow, y compris pickle'd bytestrings générés par pickle.dumps(...).

La principale différence avec Apache Ray et Plasma est qu'il assure le suivi des ID d'objet pour vous. Tous les processus, threads ou programmes qui s'exécutent localement peuvent partager les valeurs des variables en appelant le nom depuis n'importe quel Brainobjet.

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]
russellthehippo
la source