Comment attendre que toutes les goroutines se terminent sans utiliser le temps.

109

Ce code sélectionne tous les fichiers xml dans le même dossier, comme l'exécutable appelé et applique le traitement de manière asynchrone à chaque résultat dans la méthode de rappel (dans l'exemple ci-dessous, seul le nom du fichier est imprimé).

Comment éviter d'utiliser la méthode de veille pour empêcher la fermeture de la méthode principale? J'ai du mal à me concentrer sur les canaux (je suppose que c'est ce qu'il faut, pour synchroniser les résultats), donc toute aide est appréciée!

package main

import (
    "fmt"
    "io/ioutil"
    "path"
    "path/filepath"
    "os"
    "runtime"
    "time"
)

func eachFile(extension string, callback func(file string)) {
    exeDir := filepath.Dir(os.Args[0])
    files, _ := ioutil.ReadDir(exeDir)
    for _, f := range files {
            fileName := f.Name()
            if extension == path.Ext(fileName) {
                go callback(fileName)
            }
    }
}


func main() {
    maxProcs := runtime.NumCPU()
    runtime.GOMAXPROCS(maxProcs)

    eachFile(".xml", func(fileName string) {
                // Custom logic goes in here
                fmt.Println(fileName)
            })

    // This is what i want to get rid of
    time.Sleep(100 * time.Millisecond)
}
Dante
la source

Réponses:

174

Vous pouvez utiliser sync.WaitGroup . Citant l'exemple lié:

package main

import (
        "net/http"
        "sync"
)

func main() {
        var wg sync.WaitGroup
        var urls = []string{
                "http://www.golang.org/",
                "http://www.google.com/",
                "http://www.somestupidname.com/",
        }
        for _, url := range urls {
                // Increment the WaitGroup counter.
                wg.Add(1)
                // Launch a goroutine to fetch the URL.
                go func(url string) {
                        // Decrement the counter when the goroutine completes.
                        defer wg.Done()
                        // Fetch the URL.
                        http.Get(url)
                }(url)
        }
        // Wait for all HTTP fetches to complete.
        wg.Wait()
}
zzzz
la source
11
Une raison pour laquelle vous devez faire wg.Add (1) en dehors de la routine go? Pouvons-nous le faire à l'intérieur juste avant le report wg.Done ()?
sam
18
sat, oui, il y a une raison, c'est décrit dans sync.WaitGroup.Add docs: Note that calls with positive delta must happen before the call to Wait, or else Wait may wait for too small a group. Typically this means the calls to Add should execute before the statement creating the goroutine or other event to be waited for. See the WaitGroup example.
wobmene
15
L'adaptation de ce code m'a causé une longue session de débogage car mon goroutine était une fonction nommée et le fait de passer le WaitGroup en tant que valeur le copiera et rendra wg.Done () inefficace. Bien que cela puisse être résolu en passant un pointeur & wg, une meilleure façon d'éviter de telles erreurs est de déclarer la variable WaitGroup comme un pointeur en premier lieu: wg := new(sync.WaitGroup)au lieu de var wg sync.WaitGroup.
Robert Jack Will
Je suppose qu'il est valide d'écrire wg.Add(len(urls))juste au-dessus de la ligne for _, url := range urls, je pense que c'est mieux car vous n'utilisez l'ajout qu'une seule fois.
Victor
@RobertJackWill: Bonne note! BTW, ceci est couvert dans la documentation : "Un WaitGroup ne doit pas être copié après la première utilisation. Dommage que Go ne dispose pas d'un moyen de faire respecter cela . En fait, cependant, go vetdétecte ce cas et avertit avec" func passe le verrouillage par valeur : sync.WaitGroup contient sync.noCopy ".
Brent Bradburn
56

Les WaitGroups sont certainement le moyen canonique de le faire. Par souci d'exhaustivité, cependant, voici la solution qui était couramment utilisée avant l'introduction des WaitGroups. L'idée de base est d'utiliser un canal pour dire «J'ai terminé» et de faire attendre le goroutine principal jusqu'à ce que chaque routine créée ait signalé son achèvement.

func main() {
    c := make(chan struct{}) // We don't need any data to be passed, so use an empty struct
    for i := 0; i < 100; i++ {
        go func() {
            doSomething()
            c <- struct{}{} // signal that the routine has completed
        }()
    }

    // Since we spawned 100 routines, receive 100 messages.
    for i := 0; i < 100; i++ {
        <- c
    }
}
Joshlf
la source
9
Agréable de voir une solution avec des canaux simples. Un bonus supplémentaire: si doSomething()renvoie un résultat, vous pouvez le mettre sur le canal, et vous pouvez collecter et traiter les résultats dans la deuxième boucle for (dès qu'ils sont prêts)
andras
4
Cela ne fonctionne que si vous connaissez déjà la quantité de gorutines que vous souhaitez commencer. Que faire si vous écrivez une sorte de robot d'exploration HTML et que vous démarrez des gorutines de manière récursive pour chaque lien de la page?
shinydev
Vous devrez garder une trace de cela d'une manière ou d'une autre. Avec WaitGroups, c'est un peu plus facile car chaque fois que vous créez un nouveau goroutine, vous pouvez d'abord le faire wg.Add(1)et ainsi il les suivra . Avec les chaînes, ce serait un peu plus difficile.
joshlf
c bloquera puisque toutes les routines vont essayer d'y accéder, et il est sans tampon
Edwin Ikechukwu Okonkwo
Si par "bloquer", vous voulez dire que le programme se bloquera, ce n'est pas vrai. Vous pouvez essayer de l'exécuter vous-même. La raison en est que les seuls goroutines qui écrivent csont différents de la goroutine principale, qui lit à partir de c. Ainsi, le goroutine principal est toujours disponible pour lire une valeur sur le canal, ce qui se produira lorsque l'un des goroutines est disponible pour écrire une valeur sur le canal. Vous avez raison de dire que si ce code ne généra pas de goroutines mais exécutait tout dans un seul goroutine, il serait dans une impasse.
joshlf
8

sync.WaitGroup peut vous aider ici.

package main

import (
    "fmt"
    "sync"
    "time"
)


func wait(seconds int, wg * sync.WaitGroup) {
    defer wg.Done()

    time.Sleep(time.Duration(seconds) * time.Second)
    fmt.Println("Slept ", seconds, " seconds ..")
}


func main() {
    var wg sync.WaitGroup

    for i := 0; i <= 5; i++ {
        wg.Add(1)   
        go wait(i, &wg)
    }
    wg.Wait()
}
dimmg
la source
1

Bien que sync.waitGroup(wg) soit la voie à suivre canonique, cela vous oblige à faire au moins certains de vos wg.Addappels avant vous wg.Waitpour que tout soit terminé. Cela peut ne pas être possible pour des choses simples comme un robot d'exploration Web, où vous ne connaissez pas à l'avance le nombre d'appels récursifs et il faut un certain temps pour récupérer les données qui génèrent les wg.Addappels. Après tout, vous devez charger et analyser la première page avant de connaître la taille du premier lot de pages enfants.

J'ai écrit une solution utilisant des canaux, en évitant waitGroupdans ma solution le Tour of Go - exercice de robot d'exploration Web . Chaque fois qu'une ou plusieurs routines de démarrage sont lancées, vous envoyez le numéro au childrencanal. Chaque fois qu'une routine go est sur le point de se terminer, vous envoyez un message 1au donecanal. Lorsque la somme des enfants est égale à la somme de done, nous avons terminé.

Ma seule préoccupation restante est la taille codée en dur du resultscanal, mais c'est une limitation (actuelle) de Go.


// recursionController is a data structure with three channels to control our Crawl recursion.
// Tried to use sync.waitGroup in a previous version, but I was unhappy with the mandatory sleep.
// The idea is to have three channels, counting the outstanding calls (children), completed calls 
// (done) and results (results).  Once outstanding calls == completed calls we are done (if you are
// sufficiently careful to signal any new children before closing your current one, as you may be the last one).
//
type recursionController struct {
    results  chan string
    children chan int
    done     chan int
}

// instead of instantiating one instance, as we did above, use a more idiomatic Go solution
func NewRecursionController() recursionController {
    // we buffer results to 1000, so we cannot crawl more pages than that.  
    return recursionController{make(chan string, 1000), make(chan int), make(chan int)}
}

// recursionController.Add: convenience function to add children to controller (similar to waitGroup)
func (rc recursionController) Add(children int) {
    rc.children <- children
}

// recursionController.Done: convenience function to remove a child from controller (similar to waitGroup)
func (rc recursionController) Done() {
    rc.done <- 1
}

// recursionController.Wait will wait until all children are done
func (rc recursionController) Wait() {
    fmt.Println("Controller waiting...")
    var children, done int
    for {
        select {
        case childrenDelta := <-rc.children:
            children += childrenDelta
            // fmt.Printf("children found %v total %v\n", childrenDelta, children)
        case <-rc.done:
            done += 1
            // fmt.Println("done found", done)
        default:
            if done > 0 && children == done {
                fmt.Printf("Controller exiting, done = %v, children =  %v\n", done, children)
                close(rc.results)
                return
            }
        }
    }
}

Code source complet de la solution

dirkjot
la source
1

Voici une solution qui utilise WaitGroup.

Tout d'abord, définissez 2 méthodes utilitaires:

package util

import (
    "sync"
)

var allNodesWaitGroup sync.WaitGroup

func GoNode(f func()) {
    allNodesWaitGroup.Add(1)
    go func() {
        defer allNodesWaitGroup.Done()
        f()
    }()
}

func WaitForAllNodes() {
    allNodesWaitGroup.Wait()
}

Ensuite, remplacez l'invocation de callback:

go callback(fileName)

Avec un appel à votre fonction utilitaire:

util.GoNode(func() { callback(fileName) })

Dernière étape, ajoutez cette ligne à la fin de votre main, au lieu de votre sleep. Cela garantira que le thread principal attend que toutes les routines se terminent avant que le programme puisse s'arrêter.

func main() {
  // ...
  util.WaitForAllNodes()
}
gamliela
la source