C ++ 0x n'a pas de sémaphores? Comment synchroniser les threads?

135

Est-il vrai que C ++ 0x viendra sans sémaphores? Il y a déjà quelques questions sur Stack Overflow concernant l'utilisation des sémaphores. Je les utilise (sémaphores posix) tout le temps pour laisser un thread attendre un événement dans un autre thread:

void thread0(...)
{
  doSomething0();

  event1.wait();

  ...
}

void thread1(...)
{
  doSomething1();

  event1.post();

  ...
}

Si je faisais ça avec un mutex:

void thread0(...)
{
  doSomething0();

  event1.lock(); event1.unlock();

  ...
}

void thread1(...)
{
  event1.lock();

  doSomethingth1();

  event1.unlock();

  ...
}

Problème: C'est moche et il n'est pas garanti que thread1 verrouille le mutex en premier (étant donné que le même thread doit verrouiller et déverrouiller un mutex, vous ne pouvez pas non plus verrouiller event1 avant que thread0 et thread1 ne démarrent).

Donc, puisque boost n'a pas non plus de sémaphores, quel est le moyen le plus simple d'y parvenir?

tauran
la source
Peut-être utiliser la condition mutex et std :: promise et std :: future?
Yves

Réponses:

179

Vous pouvez facilement en créer un à partir d'un mutex et d'une variable de condition:

#include <mutex>
#include <condition_variable>

class semaphore
{
private:
    std::mutex mutex_;
    std::condition_variable condition_;
    unsigned long count_ = 0; // Initialized as locked.

public:
    void notify() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        ++count_;
        condition_.notify_one();
    }

    void wait() {
        std::unique_lock<decltype(mutex_)> lock(mutex_);
        while(!count_) // Handle spurious wake-ups.
            condition_.wait(lock);
        --count_;
    }

    bool try_wait() {
        std::lock_guard<decltype(mutex_)> lock(mutex_);
        if(count_) {
            --count_;
            return true;
        }
        return false;
    }
};
Maxim Egorushkin
la source
96
quelqu'un devrait soumettre une proposition au comité des normes
7
un commentaire ici qui m'a d'abord intrigué est le verrou en attente, on pourrait se demander comment un thread peut-il passer notifier si le verrou est maintenu en attente? la réponse quelque peu mal documentée est que condition_variable.wait impulsions le verrou, permettant à un autre thread de passer notifier de manière atomique, du moins c'est ce que je comprends
31
Il a été délibérément exclu de Boost au motif qu'un sémaphore est trop de corde pour que les programmeurs puissent s'y accrocher. Les variables de condition sont censées être plus gérables. Je vois leur point mais je me sens un peu patronné. Je suppose que la même logique s'applique à C ++ 11 - on attend des programmeurs qu'ils écrivent leurs programmes d'une manière qui utilise «naturellement» des condvars ou d'autres techniques de synchronisation approuvées. Fournir un sémaphore fonctionnerait contre cela, qu'il soit implémenté au-dessus de condvar ou de manière native.
Steve Jessop
5
Remarque - Voir en.wikipedia.org/wiki/Spurious_wakeup pour la justification de la while(!count_)boucle.
Dan Nissenbaum
3
@Maxim Je suis désolé, je ne pense pas que vous ayez raison. sem_wait et sem_post uniquement syscall en cas de contention (vérifiez sourceware.org/git/?p=glibc.git;a=blob;f=nptl/sem_wait.c ) donc le code ici finit par dupliquer l'implémentation de la libc, avec potentiellement des bogues. Si vous envisagez la portabilité sur n'importe quel système, cela peut être une solution, mais si vous n'avez besoin que de la compatibilité Posix, utilisez le sémaphore Posix.
xryl669
107

Sur la base de la réponse de Maxim Yegorushkin , j'ai essayé de créer l'exemple en style C ++ 11.

#include <mutex>
#include <condition_variable>

class Semaphore {
public:
    Semaphore (int count_ = 0)
        : count(count_) {}

    inline void notify()
    {
        std::unique_lock<std::mutex> lock(mtx);
        count++;
        cv.notify_one();
    }

    inline void wait()
    {
        std::unique_lock<std::mutex> lock(mtx);

        while(count == 0){
            cv.wait(lock);
        }
        count--;
    }

private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;
};
Tsuneo Yoshioka
la source
34
Vous pouvez faire attendre () aussi un trois lignes:cv.wait(lck, [this]() { return count > 0; });
Domi
2
L'ajout d'une autre classe dans l'esprit de lock_guard est également utile. À la manière RAII, le constructeur, qui prend le sémaphore comme référence, appelle l'appel wait () du sémaphore, et le destructeur appelle son appel notify (). Cela empêche les exceptions de ne pas libérer le sémaphore.
Jim Hunziker
n'y a-t-il pas de dead-lock, si disons N threads appelés wait () et count == 0, alors cv.notify_one (); n'est jamais appelé, puisque le mtx n'est pas sorti?
Marcello
1
@Marcello Les threads en attente ne tiennent pas le verrou. L'intérêt des variables de condition est de fournir une opération atomique "déverrouiller et attendre".
David Schwartz
3
Vous devez libérer le verrou avant d'appeler notify_one () pour éviter de bloquer immédiatement le réveil ... voir ici: en.cppreference.com/w/cpp/thread/condition_variable/notify_all
kylefinn
38

J'ai décidé d'écrire le sémaphore C ++ 11 le plus robuste / générique que je pouvais, dans le style de la norme autant que je le pouvais (notez que using semaphore = ...vous utiliseriez normalement le nom semaphoresimilaire à l'utilisation normale de stringnot basic_string):

template <typename Mutex, typename CondVar>
class basic_semaphore {
public:
    using native_handle_type = typename CondVar::native_handle_type;

    explicit basic_semaphore(size_t count = 0);
    basic_semaphore(const basic_semaphore&) = delete;
    basic_semaphore(basic_semaphore&&) = delete;
    basic_semaphore& operator=(const basic_semaphore&) = delete;
    basic_semaphore& operator=(basic_semaphore&&) = delete;

    void notify();
    void wait();
    bool try_wait();
    template<class Rep, class Period>
    bool wait_for(const std::chrono::duration<Rep, Period>& d);
    template<class Clock, class Duration>
    bool wait_until(const std::chrono::time_point<Clock, Duration>& t);

    native_handle_type native_handle();

private:
    Mutex   mMutex;
    CondVar mCv;
    size_t  mCount;
};

using semaphore = basic_semaphore<std::mutex, std::condition_variable>;

template <typename Mutex, typename CondVar>
basic_semaphore<Mutex, CondVar>::basic_semaphore(size_t count)
    : mCount{count}
{}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::notify() {
    std::lock_guard<Mutex> lock{mMutex};
    ++mCount;
    mCv.notify_one();
}

template <typename Mutex, typename CondVar>
void basic_semaphore<Mutex, CondVar>::wait() {
    std::unique_lock<Mutex> lock{mMutex};
    mCv.wait(lock, [&]{ return mCount > 0; });
    --mCount;
}

template <typename Mutex, typename CondVar>
bool basic_semaphore<Mutex, CondVar>::try_wait() {
    std::lock_guard<Mutex> lock{mMutex};

    if (mCount > 0) {
        --mCount;
        return true;
    }

    return false;
}

template <typename Mutex, typename CondVar>
template<class Rep, class Period>
bool basic_semaphore<Mutex, CondVar>::wait_for(const std::chrono::duration<Rep, Period>& d) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_for(lock, d, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
template<class Clock, class Duration>
bool basic_semaphore<Mutex, CondVar>::wait_until(const std::chrono::time_point<Clock, Duration>& t) {
    std::unique_lock<Mutex> lock{mMutex};
    auto finished = mCv.wait_until(lock, t, [&]{ return mCount > 0; });

    if (finished)
        --mCount;

    return finished;
}

template <typename Mutex, typename CondVar>
typename basic_semaphore<Mutex, CondVar>::native_handle_type basic_semaphore<Mutex, CondVar>::native_handle() {
    return mCv.native_handle();
}
David
la source
Cela fonctionne, avec une modification mineure. Les appels de méthode wait_forand wait_untilavec le prédicat renvoient une valeur booléenne (pas un `std :: cv_status).
jdknight
désolé de niquer si tard dans le match. std::size_tn'est pas signé donc le décrémenter en dessous de zéro est UB, et il le sera toujours >= 0. IMHO countdevrait être un int.
Richard Hodges
3
@RichardHodges il n'y a aucun moyen de décrémenter en dessous de zéro donc il n'y a pas de problème, et que signifierait un décompte négatif sur un sémaphore? Cela n'a même pas de sens à l'OMI.
David
1
@David Et si un fil de discussion devait attendre que les autres initialisent les choses? par exemple, 1 thread de lecture pour attendre 4 threads, j'appellerais le constructeur de sémaphore avec -3 pour faire attendre le thread de lecture jusqu'à ce que tous les autres threads fassent un message. Je suppose qu'il y a d'autres façons de faire cela, mais n'est-ce pas raisonnable? Je pense que c'est en fait la question que l'OP se pose, mais avec plus de "thread1".
jmmut
2
@RichardHodges étant très pédant, décrémenter un type entier non signé en dessous de 0 n'est pas UB.
jcai
15

en accord avec les sémaphores posix, j'ajouterais

class semaphore
{
    ...
    bool trywait()
    {
        boost::mutex::scoped_lock lock(mutex_);
        if(count_)
        {
            --count_;
            return true;
        }
        else
        {
            return false;
        }
    }
};

Et je préfère de loin utiliser un mécanisme de synchronisation à un niveau d'abstraction pratique, plutôt que de toujours copier-coller une version assemblée en utilisant des opérateurs plus basiques.

Michael Zillich
la source
9

Vous pouvez également consulter cpp11-on-multicore - il a une implémentation de sémaphore portable et optimale.

Le référentiel contient également d'autres goodies de threading qui complètent le threading c ++ 11.

onqtam
la source
8

Vous pouvez travailler avec des variables mutex et conditionnelles. Vous obtenez un accès exclusif avec le mutex, vérifiez si vous voulez continuer ou si vous devez attendre l'autre extrémité. Si vous avez besoin d'attendre, vous attendez dans une condition. Lorsque l'autre thread détermine que vous pouvez continuer, il signale la condition.

Il y a un court exemple dans la bibliothèque boost :: thread que vous pouvez très probablement simplement copier (les bibliothèques de threads C ++ 0x et boost sont très similaires).

David Rodríguez - Dribeas
la source
La condition ne signale que les threads en attente, ou pas? Donc, si thread0 n'est pas là en attente lorsque thread1 signale qu'il sera bloqué plus tard? De plus: je n'ai pas besoin du verrou supplémentaire fourni avec la condition - c'est au-dessus.
tauran
Oui, la condition signale uniquement les threads en attente. Le modèle commun est d'avoir une variable avec l'état et une condition au cas où vous auriez besoin d'attendre. Pensez à un producteur / consommateur, il y aura un décompte des éléments dans le tampon, le producteur verrouille, ajoute l'élément, incrémente le décompte et signale. Le consommateur se verrouille, vérifie le compteur et s'il consomme non nul, tandis que si zéro attend dans la condition.
David Rodríguez - dribeas
2
Vous pouvez simuler un sémaphore de cette façon: Initialisez une variable avec la valeur que vous donneriez au sémaphore, puis wait()est traduite en "verrouiller, vérifier le nombre si le décompte est différent de zéro et continuer; si zéro attente à la condition" alors que postserait "verrouiller, compteur d'incrément, signal s'il était de 0 "
David Rodríguez - dribeas
Oui, ça semble bon. Je me demande si les sémaphores posix sont implémentés de la même manière.
tauran
@tauran: Je ne sais pas avec certitude (et cela peut dépendre du système d'exploitation Posix), mais je pense que peu probable. Les sémaphores sont traditionnellement une primitive de synchronisation de "niveau inférieur" que les mutex et les variables de condition, et peuvent en principe être rendus plus efficaces qu'ils ne le seraient s'ils étaient implémentés au-dessus d'une condvar. Ainsi, il est plus probable dans un système d'exploitation donné que toutes les primitives de synchronisation au niveau utilisateur sont construites au-dessus de certains outils courants qui interagissent avec le planificateur.
Steve Jessop
3

Peut également être utile wrapper de sémaphore RAII dans les threads:

class ScopedSemaphore
{
public:
    explicit ScopedSemaphore(Semaphore& sem) : m_Semaphore(sem) { m_Semaphore.Wait(); }
    ScopedSemaphore(const ScopedSemaphore&) = delete;
    ~ScopedSemaphore() { m_Semaphore.Notify(); }

   ScopedSemaphore& operator=(const ScopedSemaphore&) = delete;

private:
    Semaphore& m_Semaphore;
};

Exemple d'utilisation dans une application multithread:

boost::ptr_vector<std::thread> threads;
Semaphore semaphore;

for (...)
{
    ...
    auto t = new std::thread([..., &semaphore]
    {
        ScopedSemaphore scopedSemaphore(semaphore);
        ...
    }
    );
    threads.push_back(t);
}

for (auto& t : threads)
    t.join();
Slasla
la source
3

C ++ 20 aura enfin des sémaphores - std::counting_semaphore<max_count> .

Ceux-ci auront (au moins) les méthodes suivantes:

  • acquire() (blocage)
  • try_acquire() (non bloquant, retourne immédiatement)
  • try_acquire_for() (non bloquant, prend une durée)
  • try_acquire_until() (non bloquant, prend un certain temps pour arrêter d'essayer)
  • release()

Cela n'est pas encore répertorié sur cppreference, mais vous pouvez lire ces diapositives de présentation CppCon 2019 ou regarder la vidéo . Il y a aussi la proposition officielle P0514R4 , mais je ne suis pas sûr que ce soit la version la plus à jour.

einpoklum
la source
2

J'ai trouvé que shared_ptr et low_ptr, un long avec une liste, ont fait le travail dont j'avais besoin. Mon problème était que plusieurs clients souhaitaient interagir avec les données internes d'un hôte. En règle générale, l'hôte met à jour les données de lui-même, cependant, si un client le demande, l'hôte doit arrêter la mise à jour jusqu'à ce qu'aucun client n'accède aux données de l'hôte. En même temps, un client peut demander un accès exclusif, de sorte qu'aucun autre client, ni l'hôte, ne puisse modifier ces données d'hôte.

Comment j'ai fait cela, j'ai créé une structure:

struct UpdateLock
{
    typedef std::shared_ptr< UpdateLock > ptr;
};

Chaque client aurait un membre de tel:

UpdateLock::ptr m_myLock;

Ensuite, l'hôte aurait un membre faible_ptr pour l'exclusivité, et une liste de points faibles pour les verrous non exclusifs:

std::weak_ptr< UpdateLock > m_exclusiveLock;
std::list< std::weak_ptr< UpdateLock > > m_locks;

Il existe une fonction pour activer le verrouillage et une autre fonction pour vérifier si l'hôte est verrouillé:

UpdateLock::ptr LockUpdate( bool exclusive );       
bool IsUpdateLocked( bool exclusive ) const;

Je teste les verrous dans LockUpdate, IsUpdateLocked et périodiquement dans la routine de mise à jour de l'hôte. Tester un verrou est aussi simple que de vérifier si le low_ptr a expiré et de supprimer tout expiré de la liste m_locks (je ne le fais que pendant la mise à jour de l'hôte), je peux vérifier si la liste est vide; en même temps, j'obtiens un déverrouillage automatique lorsqu'un client réinitialise le shared_ptr auquel il s'accroche, ce qui se produit également lorsqu'un client est détruit automatiquement.

L'effet global est que, puisque les clients ont rarement besoin d'exclusivité (généralement réservée aux ajouts et aux suppressions uniquement), la plupart du temps une demande à LockUpdate (false), c'est-à-dire non exclusive, réussit tant que (! M_exclusiveLock). Et un LockUpdate (true), une demande d'exclusivité, ne réussit que lorsque (! M_exclusiveLock) et (m_locks.empty ()).

Une file d'attente pourrait être ajoutée pour atténuer les verrous exclusifs et non exclusifs, cependant, je n'ai eu aucune collision jusqu'à présent, donc j'ai l'intention d'attendre que cela se produise pour ajouter la solution (surtout, j'ai donc une condition de test dans le monde réel).

Jusqu'à présent, cela fonctionne bien pour mes besoins; Je peux imaginer la nécessité d'étendre cela, et certains problèmes qui pourraient survenir lors d'une utilisation étendue, cependant, cela a été rapide à implémenter et n'a nécessité que très peu de code personnalisé.

Kit10
la source
-4

Au cas où quelqu'un serait intéressé par la version atomique, voici l'implémentation. Les performances sont attendues meilleures que la version mutex & condition variable.

class semaphore_atomic
{
public:
    void notify() {
        count_.fetch_add(1, std::memory_order_release);
    }

    void wait() {
        while (true) {
            int count = count_.load(std::memory_order_relaxed);
            if (count > 0) {
                if (count_.compare_exchange_weak(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                    break;
                }
            }
        }
    }

    bool try_wait() {
        int count = count_.load(std::memory_order_relaxed);
        if (count > 0) {
            if (count_.compare_exchange_strong(count, count-1, std::memory_order_acq_rel, std::memory_order_relaxed)) {
                return true;
            }
        }
        return false;
    }
private:
    std::atomic_int count_{0};
};
Jeffery
la source
4
Je m'attendrais à ce que les performances soient bien pires. Ce code fait presque littéralement toutes les erreurs possibles. Comme exemple le plus évident, supposons que le waitcode doive effectuer plusieurs boucles. Quand il se débloquera enfin, il prendra la mère de toutes les branches mal prédites car la prédiction de boucle du processeur prédira certainement qu'il bouclera à nouveau. Je pourrais énumérer beaucoup plus de problèmes avec ce code.
David Schwartz
1
Voici un autre tueur de performances évident: la waitboucle consommera des ressources de micro-exécution du processeur lorsqu'elle tourne. Supposons qu'il se trouve dans le même noyau physique que le thread qui est censé le faire notify- cela ralentira terriblement ce thread.
David Schwartz
1
Et voici juste une dernière: Sur les processeurs x86 (les processeurs les plus populaires aujourd'hui), une opération compare_exchange_weak est toujours une opération d'écriture, même si elle échoue (elle réécrit la même valeur qu'elle lue si la comparaison échoue). Supposons donc que deux cœurs soient tous deux dans une waitboucle pour le même sémaphore. Ils écrivent tous les deux à pleine vitesse sur la même ligne de cache, ce qui peut ralentir les autres cœurs jusqu'à l'exploration en saturant les bus inter-cœurs.
David Schwartz
@DavidSchwartz Heureux de voir vos commentaires. Je ne suis pas sûr de comprendre la partie '... Prédiction de la boucle du processeur ...'. Accepté le 2ème. Apparemment, votre troisième cas peut se produire, mais comparé au mutex qui provoque le changement du mode utilisateur en mode noyau et l'appel système, la synchronisation entre les cœurs n'est pas pire.
Jeffery
1
Il n'existe pas de sémaphore sans verrouillage. L'idée de ne pas être verrouillé n'est pas d'écrire du code sans utiliser de mutex, mais d'écrire du code là où un thread ne se bloque jamais du tout. Dans ce cas, l'essence même du sémaphore est de bloquer les threads qui appellent la fonction wait ()!
Carlo Wood