comment écouter N canaux? (instruction de sélection dynamique)

116

pour démarrer une boucle sans fin d'exécution de deux goroutines, je peux utiliser le code ci-dessous:

après avoir reçu le msg, il démarrera une nouvelle goroutine et continuera pour toujours.

c1 := make(chan string)
c2 := make(chan string)

go DoStuff(c1, 5)
go DoStuff(c2, 2)

for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}

Je voudrais maintenant avoir le même comportement pour N goroutines, mais à quoi ressemblera l'instruction select dans ce cas?

C'est le bit de code avec lequel j'ai commencé, mais je ne sais pas comment coder l'instruction select

numChans := 2

//I keep the channels in this slice, and want to "loop" over them in the select statemnt
var chans = [] chan string{}

for i:=0;i<numChans;i++{
    tmp := make(chan string);
    chans = append(chans, tmp);
    go DoStuff(tmp, i + 1)

//How shall the select statment be coded for this case?  
for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}
John Smith
la source
4
Je pense que ce que vous voulez, c'est le multiplexage des canaux. golang.org/doc/effective_go.html#chan_of_chan Fondamentalement, vous avez un seul canal que vous écoutez, puis plusieurs canaux enfants qui se dirigent vers le canal principal. Question SO connexe: stackoverflow.com/questions/10979608/…
Brenden

Réponses:

152

Vous pouvez le faire en utilisant la Selectfonction du reflet package:

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

Select exécute une opération de sélection décrite par la liste des cas. Comme l'instruction Go select, elle se bloque jusqu'à ce qu'au moins un des cas puisse se poursuivre, effectue un choix pseudo-aléatoire uniforme, puis exécute ce cas. Il renvoie l'index du cas choisi et, si ce cas était une opération de réception, la valeur reçue et un booléen indiquant si la valeur correspond à un envoi sur le canal (par opposition à une valeur nulle reçue car le canal est fermé).

Vous transmettez un tableau de SelectCasestructures qui identifient le canal à sélectionner, le sens de l'opération et une valeur à envoyer dans le cas d'une opération d'envoi.

Vous pouvez donc faire quelque chose comme ceci:

cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
    cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
chosen, value, ok := reflect.Select(cases)
// ok will be true if the channel has not been closed.
ch := chans[chosen]
msg := value.String()

Vous pouvez expérimenter un exemple plus détaillé ici: http://play.golang.org/p/8zwvSk4kjx

James Henstridge
la source
4
Y a-t-il une limite pratique au nombre de cas dans une telle sélection? Celui qui si vous allez au-delà, les performances sont gravement affectées?
Maxim Vladimirsky
4
C'est peut-être mon incompétence, mais j'ai trouvé ce modèle vraiment difficile à utiliser lorsque vous envoyez et recevez des structures complexes via le canal. Passer un canal «agrégé» partagé, comme l'a dit Tim Allclair, était beaucoup plus facile dans mon cas.
Bora M. Alper
90

Vous pouvez accomplir cela en enveloppant chaque canal dans un goroutine qui "transfère" les messages vers un canal "agrégé" partagé. Par exemple:

agg := make(chan string)
for _, ch := range chans {
  go func(c chan string) {
    for msg := range c {
      agg <- msg
    }
  }(ch)
}

select {
case msg <- agg:
    fmt.Println("received ", msg)
}

Si vous avez besoin de savoir de quel canal provient le message, vous pouvez l'envelopper dans une structure avec des informations supplémentaires avant de le transmettre au canal agrégé.

Dans mes tests (limités), cette méthode surpasse largement les performances en utilisant le package Reflect:

$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect         1    5265109013 ns/op
BenchmarkGoSelect             20      81911344 ns/op
ok      command-line-arguments  9.463s

Code de référence ici

Tim Allclair
la source
2
Votre code de référence est incorrect, vous devez effectuer une boucleb.N dans un benchmark. Sinon, les résultats (qui sont divisés par b.N, 1 et 2000000000 dans votre sortie) n'auront aucun sens.
Dave C le
2
@DaveC Merci! La conclusion ne change pas, mais les résultats sont beaucoup plus sains.
Tim Allclair
1
En effet, j'ai fait un hack rapide sur votre code de référence pour obtenir des chiffres réels . Il peut très bien y avoir quelque chose qui manque ou qui ne va pas dans ce benchmark, mais la seule chose que le code de réflexion plus compliqué a pour cela est que la configuration est plus rapide (avec GOMAXPROCS = 1) car elle n'a pas besoin de beaucoup de goroutines. Dans tous les autres cas, un simple canal de fusion de goroutine emporte la solution de réflexion (d'environ 2 ordres de grandeur).
Dave C
2
Un inconvénient important (par rapport à l' reflect.Selectapproche) est que les goroutines effectuent le tampon de fusion au minimum une valeur unique sur chaque canal fusionné. Habituellement, ce ne sera pas un problème, mais dans certaines applications spécifiques, cela peut être un facteur décisif :(.
Dave C
1
un canal de fusion tamponné aggrave le problème. Le problème est que seule la solution de réflexion peut avoir une sémantique entièrement non tamponnée. Je suis allé de l'avant et j'ai publié le code de test que j'expérimentais comme réponse distincte pour clarifier (espérons-le) ce que j'essayais de dire.
Dave C
22

Pour développer certains commentaires sur les réponses précédentes et fournir une comparaison plus claire, voici un exemple des deux approches présentées jusqu'à présent avec la même entrée, une tranche de canaux à lire et une fonction à appeler pour chaque valeur qui doit également savoir laquelle canal d'où provient la valeur.

Il existe trois différences principales entre les approches:

  • Complexité. Bien que cela puisse être en partie une préférence du lecteur, je trouve l'approche du canal plus idiomatique, simple et lisible.

  • Performance. Sur mon système Xeon amd64, les canaux goroutines + exécutent la solution de réflexion d'environ deux ordres de grandeur (en général, la réflexion dans Go est souvent plus lente et ne doit être utilisée que lorsque cela est absolument nécessaire). Bien entendu, s'il y a un retard significatif dans la fonction traitant les résultats ou dans l'écriture des valeurs sur les canaux d'entrée, cette différence de performance peut facilement devenir insignifiante.

  • Sémantique de blocage / mise en mémoire tampon. L'importance de cela dépend du cas d'utilisation. Le plus souvent, cela n'a pas d'importance ou la légère mise en mémoire tampon supplémentaire dans la solution de fusion de goroutine peut être utile pour le débit. Cependant, s'il est souhaitable d'avoir la sémantique qu'un seul écrivain est débloqué et que sa valeur est entièrement gérée avant qu'un autre écrivain ne soit débloqué, cela ne peut être réalisé qu'avec la solution de réflexion.

Notez que les deux approches peuvent être simplifiées si "l'identifiant" du canal d'envoi n'est pas requis ou si les canaux source ne seront jamais fermés.

Canal de fusion Goroutine:

// Process1 calls `fn` for each value received from any of the `chans`
// channels. The arguments to `fn` are the index of the channel the
// value came from and the string value. Process1 returns once all the
// channels are closed.
func Process1(chans []<-chan string, fn func(int, string)) {
    // Setup
    type item struct {
        int    // index of which channel this came from
        string // the actual string item
    }
    merged := make(chan item)
    var wg sync.WaitGroup
    wg.Add(len(chans))
    for i, c := range chans {
        go func(i int, c <-chan string) {
            // Reads and buffers a single item from `c` before
            // we even know if we can write to `merged`.
            //
            // Go doesn't provide a way to do something like:
            //     merged <- (<-c)
            // atomically, where we delay the read from `c`
            // until we can write to `merged`. The read from
            // `c` will always happen first (blocking as
            // required) and then we block on `merged` (with
            // either the above or the below syntax making
            // no difference).
            for s := range c {
                merged <- item{i, s}
            }
            // If/when this input channel is closed we just stop
            // writing to the merged channel and via the WaitGroup
            // let it be known there is one fewer channel active.
            wg.Done()
        }(i, c)
    }
    // One extra goroutine to watch for all the merging goroutines to
    // be finished and then close the merged channel.
    go func() {
        wg.Wait()
        close(merged)
    }()

    // "select-like" loop
    for i := range merged {
        // Process each value
        fn(i.int, i.string)
    }
}

Réflexion sélectionnez:

// Process2 is identical to Process1 except that it uses the reflect
// package to select and read from the input channels which guarantees
// there is only one value "in-flight" (i.e. when `fn` is called only
// a single send on a single channel will have succeeded, the rest will
// be blocked). It is approximately two orders of magnitude slower than
// Process1 (which is still insignificant if their is a significant
// delay between incoming values or if `fn` runs for a significant
// time).
func Process2(chans []<-chan string, fn func(int, string)) {
    // Setup
    cases := make([]reflect.SelectCase, len(chans))
    // `ids` maps the index within cases to the original `chans` index.
    ids := make([]int, len(chans))
    for i, c := range chans {
        cases[i] = reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(c),
        }
        ids[i] = i
    }

    // Select loop
    for len(cases) > 0 {
        // A difference here from the merging goroutines is
        // that `v` is the only value "in-flight" that any of
        // the workers have sent. All other workers are blocked
        // trying to send the single value they have calculated
        // where-as the goroutine version reads/buffers a single
        // extra value from each worker.
        i, v, ok := reflect.Select(cases)
        if !ok {
            // Channel cases[i] has been closed, remove it
            // from our slice of cases and update our ids
            // mapping as well.
            cases = append(cases[:i], cases[i+1:]...)
            ids = append(ids[:i], ids[i+1:]...)
            continue
        }

        // Process each value
        fn(ids[i], v.String())
    }
}

[Code complet sur le terrain de jeu Go .]

Dave C
la source
1
Il convient également de noter que la solution goroutines + canaux ne peut pas tout faire selectou le reflect.Selectfait. Les goroutines continueront à tourner jusqu'à ce qu'elles consomment tout ce qui se trouve dans les canaux, il n'y a donc pas de moyen clair de Process1sortir tôt. Il y a aussi un risque de problèmes si vous avez plusieurs lecteurs, puisque les goroutines mettent en mémoire tampon un élément de chacun des canaux, ce qui ne se produira pas avec select.
James Henstridge
@JamesHenstridge, votre première remarque sur l'arrêt n'est pas vraie. Vous vous arrangeriez pour arrêter Process1 exactement de la même manière que vous vous arrangeriez pour arrêter Process2; par exemple en ajoutant un canal "stop" qui est fermé lorsque les goroutines doivent s'arrêter. Process1 aurait besoin de deux cas selectdans une forboucle au lieu de la for rangeboucle plus simple actuellement utilisée. Process2 aurait besoin de coller un autre cas caseset de gérer spécialement cette valeur de i.
Dave C
Cela ne résout toujours pas le problème de lecture des valeurs des canaux qui ne seront pas utilisées dans le cas d'arrêt anticipé.
James Henstridge
0

Pourquoi cette approche ne fonctionnerait-elle pas en supposant que quelqu'un envoie des événements?

func main() {
    numChans := 2
    var chans = []chan string{}

    for i := 0; i < numChans; i++ {
        tmp := make(chan string)
        chans = append(chans, tmp)
    }

    for true {
        for i, c := range chans {
            select {
            case x = <-c:
                fmt.Printf("received %d \n", i)
                go DoShit(x, i)
            default: continue
            }
        }
    }
}
noonex
la source
8
Ceci est une boucle de rotation. En attendant qu'un canal d'entrée ait une valeur, cela consomme toute la CPU disponible. L'intérêt de selectsur plusieurs canaux (sans defaultclause) est qu'il attend efficacement qu'au moins un soit prêt sans tourner.
Dave C
0

Option éventuellement plus simple:

Au lieu d'avoir un tableau de canaux, pourquoi ne pas passer un seul canal en tant que paramètre aux fonctions exécutées sur des goroutines séparées, puis écouter le canal dans un goroutine consommateur?

Cela vous permet de sélectionner un seul canal dans votre auditeur, ce qui permet une sélection simple et d'éviter la création de nouvelles goroutines pour regrouper les messages de plusieurs canaux?

Fernando Sanchez
la source