Partager une grande matrice Numpy en lecture seule entre des processus de multitraitement

88

J'ai un SciPy Array (Matrix) de 60 Go que je dois partager entre plus de 5 multiprocessing Processobjets. J'ai vu numpy-sharedmem et lu cette discussion sur la liste SciPy. Il semble y avoir deux approches - numpy-sharedmemet utiliser a multiprocessing.RawArray()et mapper NumPy dtypes à ctypes. Maintenant, numpy-sharedmemsemble être la voie à suivre, mais je n'ai pas encore vu un bon exemple de référence. Je n'ai besoin d'aucun type de verrou, car le tableau (en fait une matrice) sera en lecture seule. Maintenant, en raison de sa taille, j'aimerais éviter une copie. Il semble que la méthode correcte consiste à créer la seule copie du tableau sous forme de sharedmemtableau, puis à la transmettre aux Processobjets? Quelques questions spécifiques:

  1. Quelle est la meilleure façon de transmettre réellement les handles de sharedmem aux sous-groupes Process()? Ai-je besoin d'une file d'attente juste pour faire passer un tableau? Une pipe serait-elle meilleure? Puis-je simplement le passer comme argument à l' Process()init de la sous-classe (où je suppose qu'il est décapé)?

  2. Dans la discussion que j'ai liée ci-dessus, il est question de numpy-sharedmemne pas être sûr en 64 bits? J'utilise certainement des structures qui ne sont pas adressables 32 bits.

  3. Y a-t-il des compromis dans l' RawArray()approche? Plus lent, plus poussiéreux?

  4. Ai-je besoin d'un mappage ctype-to-dtype pour la méthode numpy-sharedmem?

  5. Quelqu'un a-t-il un exemple de code OpenSource faisant cela? Je suis un expert très pratique et il est difficile de faire fonctionner cela sans aucun bon exemple à regarder.

S'il y a des informations supplémentaires que je peux fournir pour aider à clarifier cela pour les autres, veuillez commenter et j'ajouterai. Merci!

Cela doit fonctionner sur Ubuntu Linux et peut - être Mac OS, mais la portabilité n'est pas un problème majeur.

Volonté
la source
1
Si les différents processus vont écrire dans ce tableau, attendez-vous multiprocessingà faire une copie de l'ensemble pour chaque processus.
tiago
3
@tiago: "Je n'ai besoin d'aucun type de verrou, car le tableau (en fait une matrice) sera en lecture seule"
Dr. Jan-Philip Gehrcke
1
@tiago: aussi, le multitraitement ne fait pas de copie tant qu'il n'est pas explicitement dit à (via des arguments à la target_function). Le système d'exploitation va copier des parties de la mémoire du parent dans l'espace mémoire de l'enfant uniquement après modification.
Dr.Jan-Philip Gehrcke
J'ai déjà posé quelques questions à ce sujet. Ma solution peut être trouvée ici: github.com/david-hoffman/peaks/blob / ... (désolé le code est un désastre).
David Hoffman le

Réponses:

30

@Velimir Mlaker a donné une excellente réponse. J'ai pensé que je pourrais ajouter quelques commentaires et un petit exemple.

(Je n'ai pas trouvé beaucoup de documentation sur sharedmem - ce sont les résultats de mes propres expériences.)

  1. Avez-vous besoin de passer les poignées lorsque le sous-processus démarre ou après son démarrage? S'il ne s'agit que du premier, vous pouvez simplement utiliser les arguments targetet argspour Process. C'est potentiellement mieux que d'utiliser une variable globale.
  2. Dans la page de discussion que vous avez liée, il semble que la prise en charge de Linux 64 bits a été ajoutée à sharedmem il y a quelque temps, donc cela pourrait ne pas poser de problème.
  3. Je ne sais pas pour celui-ci.
  4. Non. Reportez-vous à l'exemple ci-dessous.

Exemple

#!/usr/bin/env python
from multiprocessing import Process
import sharedmem
import numpy

def do_work(data, start):
    data[start] = 0;

def split_work(num):
    n = 20
    width  = n/num
    shared = sharedmem.empty(n)
    shared[:] = numpy.random.rand(1, n)[0]
    print "values are %s" % shared

    processes = [Process(target=do_work, args=(shared, i*width)) for i in xrange(num)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print "values are %s" % shared
    print "type is %s" % type(shared[0])

if __name__ == '__main__':
    split_work(4)

Production

values are [ 0.81397784  0.59667692  0.10761908  0.6736734   0.46349645  0.98340718
  0.44056863  0.10701816  0.67167752  0.29158274  0.22242552  0.14273156
  0.34912309  0.43812636  0.58484507  0.81697513  0.57758441  0.4284959
  0.7292129   0.06063283]
values are [ 0.          0.59667692  0.10761908  0.6736734   0.46349645  0.
  0.44056863  0.10701816  0.67167752  0.29158274  0.          0.14273156
  0.34912309  0.43812636  0.58484507  0.          0.57758441  0.4284959
  0.7292129   0.06063283]
type is <type 'numpy.float64'>

Cette question connexe pourrait être utile.

James Lim
la source
37

Si vous êtes sous Linux (ou sur tout système compatible POSIX), vous pouvez définir ce tableau comme une variable globale. multiprocessingutilise fork()sous Linux lorsqu'il démarre un nouveau processus enfant. Un processus enfant nouvellement généré partage automatiquement la mémoire avec son parent tant qu'il ne la modifie pas ( mécanisme de copie sur écriture ).

Puisque vous dites "je n'ai besoin d'aucun type de verrous, puisque le tableau (en fait une matrice) sera en lecture seule" profiter de ce comportement serait une approche très simple et pourtant extrêmement efficace: tous les processus enfants y accéderont les mêmes données dans la mémoire physique lors de la lecture de ce grand tableau numpy.

Ne remettez pas votre tableau au Process()constructeur, cela donnera des instructions multiprocessingaux pickledonnées à l'enfant, ce qui serait extrêmement inefficace ou impossible dans votre cas. Sous Linux, juste après fork()l'enfant se trouve une copie exacte du parent utilisant la même mémoire physique, donc tout ce que vous avez à faire est de vous assurer que la variable Python `` contenant '' la matrice est accessible depuis la targetfonction à laquelle vous passez Process(). Vous pouvez généralement y parvenir avec une variable «globale».

Exemple de code:

from multiprocessing import Process
from numpy import random


global_array = random.random(10**4)


def child():
    print sum(global_array)


def main():
    processes = [Process(target=child) for _ in xrange(10)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()


if __name__ == "__main__":
    main()

Sous Windows - qui ne prend pas en charge fork()- multiprocessingutilise l'appel d'API win32 CreateProcess. Il crée un processus entièrement nouveau à partir de n'importe quel exécutable donné. C'est pourquoi, sous Windows, il est nécessaire de sélectionner des données pour l'enfant si l'on a besoin de données qui ont été créées lors de l'exécution du parent.

Dr Jan-Philip Gehrcke
la source
3
Copy-on-write copiera la page contenant le compteur de référence (ainsi chaque python forké aura son propre compteur de référence) mais il ne copiera pas le tableau de données entier.
robince
1
J'ajouterais que j'ai eu plus de succès avec les variables de niveau module qu'avec les variables globales ... c'est à dire ajouter la variable à un module dans la portée globale avant le fork
robince
5
Un mot d'avertissement aux personnes qui trébuchent sur cette question / réponse: si vous utilisez Numpy lié à OpenBLAS pour son opération multithread, assurez-vous de désactiver son multithreading (export OPENBLAS_NUM_THREADS = 1) lors de l'utilisation multiprocessingou des processus enfants pourraient finir par se bloquer ( utilisant généralement 1 / n d' un processeur plutôt que n processeurs) lors de l'exécution d'opérations d'algèbre linéaire sur un tableau / matrice global partagé. Le conflit multithread connu avec OpenBLAS semble s'étendre à Pythonmultiprocessing
Dologan
1
Quelqu'un peut-il expliquer pourquoi python n'utiliserait pas simplement le système forkd' exploitation pour transmettre les paramètres donnés Process, au lieu de les sérialiser? Autrement dit, ne pourrait-il pas forkêtre appliqué au processus parent juste avant l' child appel, de sorte que la valeur du paramètre soit toujours disponible à partir du système d'exploitation? Cela semble-t-il plus efficace que de le sérialiser?
max
2
Nous sommes tous conscients que ce fork()n'est pas disponible sur Windows, cela a été indiqué dans ma réponse et à plusieurs reprises dans les commentaires. Je sais que c'était votre question initiale, et j'y ai répondu à quatre commentaires ci - dessus : "le compromis est d'utiliser la même méthode de transfert de paramètres sur les deux plates-formes par défaut, pour une meilleure maintenabilité et pour assurer un comportement égal.". Les deux méthodes ont leurs avantages et leurs inconvénients, c'est pourquoi dans Python 3, l'utilisateur dispose d'une plus grande flexibilité pour choisir la méthode. Cette discussion n'est pas productive sans parler de détails, ce que nous ne devrions pas faire ici.
Dr.Jan-Philip Gehrcke
24

Vous pourriez être intéressé par un petit morceau de code que j'ai écrit: github.com/vmlaker/benchmark-sharedmem

Le seul dossier intéressant est main.py. C'est une référence de numpy-sharedmem - le code passe simplement des tableaux (soit numpyou sharedmem) aux processus générés, via Pipe. Les travailleurs font simplement appel sum()aux données. Je n'étais intéressé que par la comparaison des temps de communication des données entre les deux implémentations.

J'ai également écrit un autre code plus complexe: github.com/vmlaker/sherlock .

Ici, j'utilise le module numpy-sharedmem pour le traitement d'images en temps réel avec OpenCV - les images sont des tableaux NumPy, selon la nouvelle cv2API d' OpenCV . Les images, en fait leurs références, sont partagées entre les processus via l'objet dictionnaire créé à partir de multiprocessing.Manager(par opposition à l'utilisation de la file d'attente ou du tuyau.) J'obtiens de grandes améliorations de performances par rapport à l'utilisation de tableaux NumPy simples.

Pipe vs file d'attente :

D'après mon expérience, IPC avec Pipe est plus rapide que Queue. Et cela a du sens, car Queue ajoute un verrouillage pour le rendre sûr pour plusieurs producteurs / consommateurs. Pipe ne le fait pas. Mais si vous n'avez que deux processus qui se parlent dans les deux sens, vous pouvez utiliser Pipe en toute sécurité ou, comme le lit la documentation:

... il n'y a aucun risque de corruption de processus utilisant différentes extrémités du tuyau en même temps.

sharedmemsécurité :

Le principal problème avec le sharedmemmodule est la possibilité d'une fuite de mémoire lors d'une sortie de programme peu gracieuse. Ceci est décrit dans une longue discussion ici . Bien que le 10 avril 2011, Sturla mentionne un correctif pour une fuite de mémoire, j'ai toujours eu des fuites depuis lors, en utilisant les deux dépôts, ceux de Sturla Molden sur GitHub ( github.com/sturlamolden/sharedmem-numpy ) et Chris Lee-Messer sur Bitbucket ( bitbucket.org/cleemesser/numpy-sharedmem ).

Velimir Mlaker
la source
Merci, très très instructif. La fuite de mémoire sharedmemsemble cependant être un gros problème. Des pistes pour résoudre cela?
Will
1
Au-delà du simple fait de remarquer les fuites, je ne l'ai pas cherché dans le code. J'ai ajouté à ma réponse, sous "sharedmem safety" ci-dessus, les gardiens des deux dépôts open source du sharedmemmodule, pour référence.
Velimir Mlaker
14

Si votre tableau est aussi grand, vous pouvez utiliser numpy.memmap. Par exemple, si vous avez un tableau stocké sur disque, disons 'test.array', vous pouvez utiliser des processus simultanés pour accéder aux données qu'il contient même en mode "écriture", mais votre cas est plus simple puisque vous n'avez besoin que du mode "lecture".

Création du tableau:

a = np.memmap('test.array', dtype='float32', mode='w+', shape=(100000,1000))

Vous pouvez ensuite remplir ce tableau de la même manière que vous le faites avec un tableau ordinaire. Par exemple:

a[:10,:100]=1.
a[10:,100:]=2.

Les données sont stockées sur le disque lorsque vous supprimez la variable a.

Plus tard, vous pouvez utiliser plusieurs processus qui accéderont aux données dans test.array:

# read-only mode
b = np.memmap('test.array', dtype='float32', mode='r', shape=(100000,1000))

# read and writing mode
c = np.memmap('test.array', dtype='float32', mode='r+', shape=(100000,1000))

Réponses connexes:

Saullo GP Castro
la source
3

Vous pouvez également trouver utile de consulter la documentation de pyro, car si vous pouviez partitionner votre tâche de manière appropriée, vous pourriez l'utiliser pour exécuter différentes sections sur différentes machines ainsi que sur différents cœurs de la même machine.

Steve Barnes
la source
0

Pourquoi ne pas utiliser le multithreading? Les ressources du processus principal peuvent être partagées nativement par ses threads, donc le multithreading est évidemment un meilleur moyen de partager des objets appartenant au processus principal.

Si vous vous inquiétez du mécanisme GIL de python, vous pouvez peut-être recourir au nogilof numba.

Nico
la source