Correction d’erreur dans le message sur l’objet moniteur dans la file d’attente thread-safe

Correction d’erreur dans le message sur l’objet moniteur dans la file d’attente thread-safe

2023-07-03 17:29:00

Dans mon dernier article”Modèles d’architecture logicielle : Monitor Object” J’ai implémenté une file d’attente thread-safe. J’ai fait deux erreurs graves. Désolé. Aujourd’hui, je vais corriger ces erreurs.

Publicité


Rainer Grimm travaille depuis de nombreuses années en tant qu’architecte logiciel, chef d’équipe et responsable de la formation. Il aime écrire des articles sur les langages de programmation C++, Python et Haskell, mais aime aussi intervenir fréquemment lors de conférences spécialisées. Sur son blog Modernes C++, il traite intensément de sa passion pour le C++.



Afin de comprendre le contexte, je voudrais d’abord présenter à nouveau l’implémentation défectueuse de mon dernier message.

// monitorObject.cpp

#include 
#include 
#include 
#include 
#include 
#include 
#include 

class Monitor {
public:
    void lock() const {
        monitMutex.lock();
    }

    void unlock() const {
        monitMutex.unlock();
    }

    void notify_one() const noexcept {
        monitCond.notify_one();
    }

    template 
    void wait(Predicate pred) const {                 // (10)
        std::unique_lock monitLock(monitMutex);
        monitCond.wait(monitLock, pred);
    }
    
private:
    mutable std::mutex monitMutex;
    mutable std::condition_variable monitCond;
};

template                                   // (1)
class ThreadSafeQueue: public Monitor {
 public:
    void add(T val){ 
        lock();
        myQueue.push(val);                             // (6)
        unlock();
        notify_one();
    }
    
    T get(){ 
        wait( [this] { return ! myQueue.empty(); } );  // (2)
        lock();
        auto val = myQueue.front();                    // (4)
        myQueue.pop();                                 // (5)
        unlock();
        return val;
    }

private:
    std::queue myQueue;                            // (3)
};


class Dice {
public:
    int operator()(){ return rand(); }
private:
    std::function rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                          std::default_random_engine());
};


int main(){
    
    std::cout << 'n';
    
    constexpr auto NumberThreads = 100;
    
    ThreadSafeQueue safeQueue;                      // (7)

    auto addLambda = [&safeQueue](int val){ safeQueue.add(val);          // (8)
                                            std::cout << val << " "
                                            << std::this_thread::get_id() << "; "; 
                                          }; 
    auto getLambda = [&safeQueue]{ safeQueue.get(); };  // (9)

    std::vector addThreads(NumberThreads);
    Dice dice;
    for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

    std::vector getThreads(NumberThreads);
    for (auto& thr: getThreads) thr = std::thread(getLambda);

    for (auto& thr: addThreads) thr.join();
    for (auto& thr: getThreads) thr.join();
    
    std::cout << "nn";
     
}

L'idée centrale de l'exemple est que l'objet moniteur est encapsulé dans une classe et peut donc être réutilisé. La classe Monitor utilise un std::mutex comme un verrou de moniteur et un std::condition_variable comme condition de surveillance. La classe Monitor fournit l'interface minimale qu'un objet moniteur doit prendre en charge.

ThreadSafeQueue étendu en (1). std::queue pour avoir une interface thread-safe. ThreadSafeQueue dérive de la classe Monitor et utilise leurs fonctions membres pour obtenir les fonctions membres synchronisées add et get soutenir. Les fonctions de membre add et get utilisez le verrou du moniteur pour protéger l'objet du moniteur. Cela est particulièrement vrai pour le non-thread safe myQueue. add avertit le fil d'attente lorsqu'un nouvel élément arrive myQueue était ajouté. Cette notification est thread-safe. La fonction membre get (3) mérite plus d'attention. Premièrement la wait- Fonction membre appelée de la variable de condition sous-jacente. Ce wait-Call nécessite un prédicat supplémentaire pour se protéger contre les réveils perdus et intempestifs (C++ Core Guidelines : Soyez conscient des dangers des variables conditionnelles) protéger. Les opérations pour changer le myQueue (4) et (5) doivent également être protégés car ils traitent de l'appel myQueue.push(val) (6) peuvent se chevaucher. L'objet moniteur safeQueue (7) utilise les fonctions lambda dans (8) et (9) pour obtenir un nombre à partir du safeQueue ajouter ou retirer. ThreadSafeQueue lui-même est un modèle de classe et peut prendre des valeurs de n'importe quel type. Une centaine de clients ajoutent le safeQueue 100 numéros aléatoires entre 1 et 6 (ligne 7) tandis que cent clients sélectionnent ces 100 numéros en même temps dans la safeQueue supprimé. La sortie du programme affiche les nombres et les ID de thread.



Ce programme a deux problèmes sérieux. Dietmar Kühl et Franck Birbacher ont décrit les problèmes dans un e-mail. Voici ses mots. Mes commentaires sont en italique et en gras.

Publicité

  1. Dans ThreadSafeQueue::get() le fera au moyen de Monitor::wait() testé si myQueue contient un élément ou a attendu qu'un élément soit contenu. Cependant, le verrou ne sera que dans wait() tenue, c'est-à-dire dans get() on ne peut pas être sûr que l'article est toujours dans myQueue est : un autre thread peut obtenir le verrou et supprimer l'élément, ce qui entraîne un comportement indéfini lors de l'appel de myQueue.front().
  2. Si le constructeur de copie/déplacement de T lève une exception est le ThreadSafeQueue dans un état incohérent : aucune fonction membre n'est active, mais le mutex est verrouillé.

La correction est que Monitor::wait() ne peut être appelé que si un unique_lock est retenu. Cela peut être réalisé, par exemple, en fournissant à Monitor une fonction correspondante (protégée ?) qui renvoie un objet approprié et une référence à celui-ci dans wait() a besoin:

struct Monitor {
   using Lock = std::unique_lock; // could be wrapper if you prefer
   [[nodiscard]] Lock receiveGuard() { return Lock(monitMutex); }
   template 
   void wait(Lock& kerberos, Predicate pred) { monitCond.wait(kerberos, pred); }
   // …
};

template 
T ThreadSafeQueue::get() {
   auto kerberos = receiveGuard();
   wait(kerberos, [this]{ return not myQueue.empty(); });
   T rc = std::move(myQueue.front());
   myqueue.pop();
   return rc;
}

Cette version corrige le problème d'exception pour get(). Pour add() vous pouvez simplement utiliser l'objet moniteur avec un lock_guard utiliser:

template 
void add(T val) {
   {
        std::lock_guard kerberos(*this);
        myqueue.push(std::move(val));
    }
    notify_one();
}

Je mettrais probablement la notification dans un "SendGuard” enveloppe celui-là lock_guard et une référence au condition_variable contient et envoie la notification lors de la destruction :

class SendGuard {
    friend class Monitor;
    using deleter = decltype([](auto& cond){ cond->notify_one(); });
    std::unique_ptr notifier;
    std::lock_guard kerberos;
    SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
};

Le constructeur et le destructeur de déplacement doivent toujours public être et représenter toute l'interface ! Ce serait l'utilisation dans add() aussi beaucoup plus facile:

template 
void add(T val) {
   auto kerberos = sendGuard();
   myqueue.push(val);
}

Enfin, voici la mise en œuvre complète de Dietmar. Les chiffres correspondent aux chiffres du mien monitorObjec.cpp Exemple.

// monitorObject.cpp

#include 
#include 
#include 
#include 
#include 
#include 
#include 

class Monitor {
public:
    using Lock = std::unique_lock;
    [[nodiscard]] Lock receiveGuard() {
        return Lock(monitMutex);
    }

    template 
    void wait(Lock& kerberos, Predicate pred) {
        monitCond.wait(kerberos, pred);
    }

    class SendGuard {
        friend class Monitor;
        using deleter = decltype([](auto* cond){ cond->notify_one(); });
        std::unique_ptr notifier;
        std::lock_guard kerberos;
        SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
    };

    SendGuard sendGuard() { return {monitMutex, monitCond}; }
    
private:
    mutable std::mutex monitMutex;
    mutable std::condition_variable monitCond;
};

template                                   // (1)
class ThreadSafeQueue: public Monitor {
 public:
    void add(T val){ 
        auto kerberos = sendGuard();
        myQueue.push(val);                             // (6)
    }
    
    T get(){ 
        auto kerberos = receiveGuard();
        wait(kerberos, [this] { return ! myQueue.empty(); } );  // (2)
        auto val = myQueue.front();                    // (4)
        myQueue.pop();                                 // (5)
        return val;
    }

private:
    std::queue myQueue;                            // (3)
};


class Dice {
public:
    int operator()(){ return rand(); }
private:
    std::function rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                          std::default_random_engine());
};


int main(){
    
    std::cout << 'n';
    
    constexpr auto NumberThreads = 100;
    
    ThreadSafeQueue safeQueue;                      // (7)

    auto addLambda = [&safeQueue](int val){ safeQueue.add(val);          // (8)
                                            std::cout << val << " "
                                            << std::this_thread::get_id() << "; "; 
                                          }; 
    auto getLambda = [&safeQueue]{ safeQueue.get(); };  // (9)

    std::vector addThreads(NumberThreads);
    Dice dice;
    for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

    std::vector getThreads(NumberThreads);
    for (auto& thr: getThreads) thr = std::thread(getLambda);

    for (auto& thr: addThreads) thr.join();
    for (auto& thr: getThreads) thr.join();
    
    std::cout << "nn";
     
}

À la suite de la discussion ci-dessus, Frank a proposé la version suivante ci-dessous, qui a une interface cohérente et facile à utiliser pour Monitor.

// threadSafeQueue.cpp

#ifndef INCLUDED_PATTERNS_MONITOR2_MONITOR_HPP
#define INCLUDED_PATTERNS_MONITOR2_MONITOR_HPP

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 


class Monitor {
public:
    struct UnlockAndNotify {
        std::mutex d_mutex;
        std::condition_variable d_condition;

        void lock() { d_mutex.lock(); }
        void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
    };

private:
    UnlockAndNotify d_combined;

public:
    std::unique_lock makeLockWithNotify() {
        return std::unique_lock{d_combined};
    }

    template 
    std::unique_lock makeLockWithWait(PRED waitForCondition) {
        std::unique_lock lock{d_combined.d_mutex};
        d_combined.d_condition.wait(lock, waitForCondition);
        return lock;
    }
};

class ThreadQueue {
    Monitor d_monitor;
    std::deque d_numberQueue;

    auto makeLockWhenNotEmpty() {
        return d_monitor.makeLockWithWait([this] { return !d_numberQueue.empty(); });
    }

public:
    void addNumber(int number) {
        const auto lock = d_monitor.makeLockWithNotify();
        d_numberQueue.push_back(number);
    }

    int removeNumber() {
        const auto lock = makeLockWhenNotEmpty();
        const auto number = d_numberQueue.front();
        d_numberQueue.pop_front();
        return number;
    }
};

#endif

int main() {
   ThreadQueue queue;
   std::atomic sharedSum{};
   std::atomic sharedCounter{};

   std::vector threads;
   threads.reserve(200);
   std::generate_n(std::back_inserter(threads), 100, [&] {
       return std::jthread{[&] { sharedSum += queue.removeNumber(); }};
   });
   std::generate_n(std::back_inserter(threads), 100, [&] {
       return std::jthread{[&] { queue.addNumber(++sharedCounter); }};
   });

   threads.clear(); // wait for all threads to finish
   if (sharedSum.load() != 5050) {
       throw std::logic_error("Wrong result for sum of 1..100");
   }
}

L'implémentation de l'objet Monitor repose sur la flexibilité de std::unique_lock via son paramètre de modèle. Tous les verrous standard C++ peuvent être utilisés avec n'importe quelle classe qui utilise lock()- et unlock()-Méthodes. La classe UnlockAndNotify implémente cette interface et définit sa variable de condition dans le unlock()-Méthode gratuite. De plus, la classe offre Monitor fournit une interface publique réduite qui peut être utilisée pour créer deux types de verrous différents, un avec et un sans notification, en utilisant un std::unique_lock soit dans l'ensemble UnlockAndNotify-instance ou uniquement sur le inclus std::mutex est créé.

Lors du choix entre std::unique_lock et std::lock_guard Je préfère (Franc) mourir unique_lock dans l'interface. Ce choix offre plus de flexibilité à l'utilisateur de la classe moniteur. J'apprécie cette flexibilité plus qu'une éventuelle différence de performance lock_guard, qui doit être mesuré de toute façon. J'admettrai que les exemples donnés ne profitent pas de cette flexibilité supplémentaire.

Après cela, Dietmar a développé l'idée de Frank : Ici, les données protégées sont stockées dans le moniteur, ce qui rend plus difficile leur accès sans protection.

// threadsafequeue2.cpp

#ifndef INCLUDED_PATTERNS_MONITOR3_MONITOR_HPP
#define INCLUDED_PATTERNS_MONITOR3_MONITOR_HPP

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

namespace patterns::monitor3 {

template 
class Monitor {
public:
   struct UnlockAndNotify {
       std::mutex d_mutex;
       std::condition_variable d_condition;
   
       void lock() { d_mutex.lock(); }
       void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
   };

private:
   mutable UnlockAndNotify d_combined;
   mutable T               d_data;

public:
   std::tuple> makeProducerLock() const {
       return { d_data, std::unique_lock{d_combined} };
   }

   template 
   std::tuple> makeConsumerLockWhen(PRED predicate) const {
       std::unique_lock lock{d_combined.d_mutex};
       d_combined.d_condition.wait(lock, [this, predicate]{ return predicate(d_data); });
       return { d_data, std::move(lock) };
   }
};

template 
class ThreadQueue {
   Monitor> d_monitor;

public:
   void add(T number) {
       auto[numberQueue, lock] = d_monitor.makeProducerLock();
       numberQueue.push_back(number);
   }

   T remove() {
       auto[numberQueue, lock] = d_monitor.makeConsumerLockWhen([](auto& numberQueue) { return !numberQueue.empty(); });
       const auto number = numberQueue.front();
       numberQueue.pop_front();
       return number;
   }
};
}

#endif

class Dice {
public:
    int operator()(){ return rand(); }
private:
    std::function rand = std::bind(std::uniform_int_distribution<>(1, 6), 
                                          std::default_random_engine());
};

int main(){
    
    std::cout << 'n';
    
    constexpr auto NumberThreads = 100;
    
    patterns::monitor3::ThreadQueue safeQueue;                     

    auto addLambda = [&safeQueue](int val){ safeQueue.add(val);         
                                            std::cout << val << " "
                                            << std::this_thread::get_id() << "; "; 
                                          }; 
    auto getLambda = [&safeQueue]{ safeQueue.remove(); };  

    std::vector addThreads(NumberThreads);
    Dice dice;
    for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

    std::vector getThreads(NumberThreads);
    for (auto& thr: getThreads) thr = std::thread(getLambda);

    for (auto& thr: addThreads) thr.join();
    for (auto& thr: getThreads) thr.join();
    
    std::cout << "nn";
     
}

Merci encore à Franc et Dietmar. Je ne voulais pas prouver que la concurrence est très exigeante avec mon implémentation incorrecte d'une file d'attente thread-safe dans mon dernier article. Ce qui est particulièrement ennuyeux, c'est que je n'ai pas mis le mutex dans un verrou (Erreur 2). J'enseigne cela dans mes cours de C++ : NNM (No Naked Mutex).

Dans mon prochain article, je plongerai profondément dans l'avenir de C++20 : C++23.


(rme)

Vers la page d'accueil



#Correction #derreur #dans #message #sur #lobjet #moniteur #dans #file #dattente #threadsafe
1688447095

Facebook
Twitter
LinkedIn
Pinterest

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.