2023-07-03 17:29:00
Dans mon dernier article”Modèles d’architecture logicielle : Monitor Object” J’ai implémenté une file d’attente thread-safe. J’ai fait deux erreurs graves. Désolé. Aujourd’hui, je vais corriger ces erreurs.
Publicité
Rainer Grimm travaille depuis de nombreuses années en tant qu’architecte logiciel, chef d’équipe et responsable de la formation. Il aime écrire des articles sur les langages de programmation C++, Python et Haskell, mais aime aussi intervenir fréquemment lors de conférences spécialisées. Sur son blog Modernes C++, il traite intensément de sa passion pour le C++.
Afin de comprendre le contexte, je voudrais d’abord présenter à nouveau l’implémentation défectueuse de mon dernier message.
// monitorObject.cpp
#include
#include
#include
#include
#include
#include
#include
class Monitor {
public:
void lock() const {
monitMutex.lock();
}
void unlock() const {
monitMutex.unlock();
}
void notify_one() const noexcept {
monitCond.notify_one();
}
template
void wait(Predicate pred) const { // (10)
std::unique_lock monitLock(monitMutex);
monitCond.wait(monitLock, pred);
}
private:
mutable std::mutex monitMutex;
mutable std::condition_variable monitCond;
};
template // (1)
class ThreadSafeQueue: public Monitor {
public:
void add(T val){
lock();
myQueue.push(val); // (6)
unlock();
notify_one();
}
T get(){
wait( [this] { return ! myQueue.empty(); } ); // (2)
lock();
auto val = myQueue.front(); // (4)
myQueue.pop(); // (5)
unlock();
return val;
}
private:
std::queue myQueue; // (3)
};
class Dice {
public:
int operator()(){ return rand(); }
private:
std::function rand = std::bind(std::uniform_int_distribution<>(1, 6),
std::default_random_engine());
};
int main(){
std::cout << 'n';
constexpr auto NumberThreads = 100;
ThreadSafeQueue safeQueue; // (7)
auto addLambda = [&safeQueue](int val){ safeQueue.add(val); // (8)
std::cout << val << " "
<< std::this_thread::get_id() << "; ";
};
auto getLambda = [&safeQueue]{ safeQueue.get(); }; // (9)
std::vector addThreads(NumberThreads);
Dice dice;
for (auto& thr: addThreads) thr = std::thread(addLambda, dice());
std::vector getThreads(NumberThreads);
for (auto& thr: getThreads) thr = std::thread(getLambda);
for (auto& thr: addThreads) thr.join();
for (auto& thr: getThreads) thr.join();
std::cout << "nn";
}
L'idée centrale de l'exemple est que l'objet moniteur est encapsulé dans une classe et peut donc être réutilisé. La classe Monitor
utilise un std::mutex
comme un verrou de moniteur et un std::condition_variable
comme condition de surveillance. La classe Monitor
fournit l'interface minimale qu'un objet moniteur doit prendre en charge.
ThreadSafeQueue
étendu en (1). std::queue
pour avoir une interface thread-safe. ThreadSafeQueue
dérive de la classe Monitor
et utilise leurs fonctions membres pour obtenir les fonctions membres synchronisées add
et get
soutenir. Les fonctions de membre add
et get
utilisez le verrou du moniteur pour protéger l'objet du moniteur. Cela est particulièrement vrai pour le non-thread safe myQueue
. add
avertit le fil d'attente lorsqu'un nouvel élément arrive myQueue
était ajouté. Cette notification est thread-safe. La fonction membre get
(3) mérite plus d'attention. Premièrement la wait
- Fonction membre appelée de la variable de condition sous-jacente. Ce wait
-Call nécessite un prédicat supplémentaire pour se protéger contre les réveils perdus et intempestifs (C++ Core Guidelines : Soyez conscient des dangers des variables conditionnelles) protéger. Les opérations pour changer le myQueue
(4) et (5) doivent également être protégés car ils traitent de l'appel myQueue.push(val)
(6) peuvent se chevaucher. L'objet moniteur safeQueue
(7) utilise les fonctions lambda dans (8) et (9) pour obtenir un nombre à partir du safeQueue
ajouter ou retirer. ThreadSafeQueue
lui-même est un modèle de classe et peut prendre des valeurs de n'importe quel type. Une centaine de clients ajoutent le safeQueue
100 numéros aléatoires entre 1 et 6 (ligne 7) tandis que cent clients sélectionnent ces 100 numéros en même temps dans la safeQueue
supprimé. La sortie du programme affiche les nombres et les ID de thread.
Ce programme a deux problèmes sérieux. Dietmar Kühl et Franck Birbacher ont décrit les problèmes dans un e-mail. Voici ses mots. Mes commentaires sont en italique et en gras.
Publicité
- Dans
ThreadSafeQueue::get()
le fera au moyen deMonitor::wait()
testé simyQueue
contient un élément ou a attendu qu'un élément soit contenu. Cependant, le verrou ne sera que danswait()
tenue, c'est-à-dire dansget()
on ne peut pas être sûr que l'article est toujours dansmyQueue
est : un autre thread peut obtenir le verrou et supprimer l'élément, ce qui entraîne un comportement indéfini lors de l'appel de myQueue.front(). - Si le constructeur de copie/déplacement de
T
lève une exception est leThreadSafeQueue
dans un état incohérent : aucune fonction membre n'est active, mais le mutex est verrouillé.
La correction est que Monitor::wait()
ne peut être appelé que si un unique_lock
est retenu. Cela peut être réalisé, par exemple, en fournissant à Monitor une fonction correspondante (protégée ?) qui renvoie un objet approprié et une référence à celui-ci dans wait()
a besoin:
struct Monitor {
using Lock = std::unique_lock; // could be wrapper if you prefer
[[nodiscard]] Lock receiveGuard() { return Lock(monitMutex); }
template
void wait(Lock& kerberos, Predicate pred) { monitCond.wait(kerberos, pred); }
// …
};
template
T ThreadSafeQueue::get() {
auto kerberos = receiveGuard();
wait(kerberos, [this]{ return not myQueue.empty(); });
T rc = std::move(myQueue.front());
myqueue.pop();
return rc;
}
Cette version corrige le problème d'exception pour get().
Pour add()
vous pouvez simplement utiliser l'objet moniteur avec un lock_guard
utiliser:
template
void add(T val) {
{
std::lock_guard kerberos(*this);
myqueue.push(std::move(val));
}
notify_one();
}
Je mettrais probablement la notification dans un "SendGuard
” enveloppe celui-là lock_guard
et une référence au condition_variable
contient et envoie la notification lors de la destruction :
class SendGuard {
friend class Monitor;
using deleter = decltype([](auto& cond){ cond->notify_one(); });
std::unique_ptr notifier;
std::lock_guard kerberos;
SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
};
Le constructeur et le destructeur de déplacement doivent toujours public
être et représenter toute l'interface ! Ce serait l'utilisation dans add()
aussi beaucoup plus facile:
template
void add(T val) {
auto kerberos = sendGuard();
myqueue.push(val);
}
Enfin, voici la mise en œuvre complète de Dietmar. Les chiffres correspondent aux chiffres du mien monitorObjec.cpp
Exemple.
// monitorObject.cpp
#include
#include
#include
#include
#include
#include
#include
class Monitor {
public:
using Lock = std::unique_lock;
[[nodiscard]] Lock receiveGuard() {
return Lock(monitMutex);
}
template
void wait(Lock& kerberos, Predicate pred) {
monitCond.wait(kerberos, pred);
}
class SendGuard {
friend class Monitor;
using deleter = decltype([](auto* cond){ cond->notify_one(); });
std::unique_ptr notifier;
std::lock_guard kerberos;
SendGuard(auto& mutex, auto& cond): notifier(&cond), kerberos(mutex) {}
};
SendGuard sendGuard() { return {monitMutex, monitCond}; }
private:
mutable std::mutex monitMutex;
mutable std::condition_variable monitCond;
};
template // (1)
class ThreadSafeQueue: public Monitor {
public:
void add(T val){
auto kerberos = sendGuard();
myQueue.push(val); // (6)
}
T get(){
auto kerberos = receiveGuard();
wait(kerberos, [this] { return ! myQueue.empty(); } ); // (2)
auto val = myQueue.front(); // (4)
myQueue.pop(); // (5)
return val;
}
private:
std::queue myQueue; // (3)
};
class Dice {
public:
int operator()(){ return rand(); }
private:
std::function rand = std::bind(std::uniform_int_distribution<>(1, 6),
std::default_random_engine());
};
int main(){
std::cout << 'n';
constexpr auto NumberThreads = 100;
ThreadSafeQueue safeQueue; // (7)
auto addLambda = [&safeQueue](int val){ safeQueue.add(val); // (8)
std::cout << val << " "
<< std::this_thread::get_id() << "; ";
};
auto getLambda = [&safeQueue]{ safeQueue.get(); }; // (9)
std::vector addThreads(NumberThreads);
Dice dice;
for (auto& thr: addThreads) thr = std::thread(addLambda, dice());
std::vector getThreads(NumberThreads);
for (auto& thr: getThreads) thr = std::thread(getLambda);
for (auto& thr: addThreads) thr.join();
for (auto& thr: getThreads) thr.join();
std::cout << "nn";
}
À la suite de la discussion ci-dessus, Frank a proposé la version suivante ci-dessous, qui a une interface cohérente et facile à utiliser pour Monitor.
// threadSafeQueue.cpp
#ifndef INCLUDED_PATTERNS_MONITOR2_MONITOR_HPP
#define INCLUDED_PATTERNS_MONITOR2_MONITOR_HPP
#include
#include
#include
#include
#include
#include
#include
#include
#include
class Monitor {
public:
struct UnlockAndNotify {
std::mutex d_mutex;
std::condition_variable d_condition;
void lock() { d_mutex.lock(); }
void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
};
private:
UnlockAndNotify d_combined;
public:
std::unique_lock makeLockWithNotify() {
return std::unique_lock{d_combined};
}
template
std::unique_lock makeLockWithWait(PRED waitForCondition) {
std::unique_lock lock{d_combined.d_mutex};
d_combined.d_condition.wait(lock, waitForCondition);
return lock;
}
};
class ThreadQueue {
Monitor d_monitor;
std::deque d_numberQueue;
auto makeLockWhenNotEmpty() {
return d_monitor.makeLockWithWait([this] { return !d_numberQueue.empty(); });
}
public:
void addNumber(int number) {
const auto lock = d_monitor.makeLockWithNotify();
d_numberQueue.push_back(number);
}
int removeNumber() {
const auto lock = makeLockWhenNotEmpty();
const auto number = d_numberQueue.front();
d_numberQueue.pop_front();
return number;
}
};
#endif
int main() {
ThreadQueue queue;
std::atomic sharedSum{};
std::atomic sharedCounter{};
std::vector threads;
threads.reserve(200);
std::generate_n(std::back_inserter(threads), 100, [&] {
return std::jthread{[&] { sharedSum += queue.removeNumber(); }};
});
std::generate_n(std::back_inserter(threads), 100, [&] {
return std::jthread{[&] { queue.addNumber(++sharedCounter); }};
});
threads.clear(); // wait for all threads to finish
if (sharedSum.load() != 5050) {
throw std::logic_error("Wrong result for sum of 1..100");
}
}
L'implémentation de l'objet Monitor repose sur la flexibilité de std::unique_lock
via son paramètre de modèle. Tous les verrous standard C++ peuvent être utilisés avec n'importe quelle classe qui utilise lock()
- et unlock()
-Méthodes. La classe UnlockAndNotify
implémente cette interface et définit sa variable de condition dans le unlock()
-Méthode gratuite. De plus, la classe offre Monitor
fournit une interface publique réduite qui peut être utilisée pour créer deux types de verrous différents, un avec et un sans notification, en utilisant un std::unique_lock
soit dans l'ensemble UnlockAndNotify
-instance ou uniquement sur le inclus std::mutex
est créé.
Lors du choix entre std::unique_lock
et std::lock_guard
Je préfère (Franc) mourir unique_lock
dans l'interface. Ce choix offre plus de flexibilité à l'utilisateur de la classe moniteur. J'apprécie cette flexibilité plus qu'une éventuelle différence de performance lock_guard
, qui doit être mesuré de toute façon. J'admettrai que les exemples donnés ne profitent pas de cette flexibilité supplémentaire.
Après cela, Dietmar a développé l'idée de Frank : Ici, les données protégées sont stockées dans le moniteur, ce qui rend plus difficile leur accès sans protection.
// threadsafequeue2.cpp
#ifndef INCLUDED_PATTERNS_MONITOR3_MONITOR_HPP
#define INCLUDED_PATTERNS_MONITOR3_MONITOR_HPP
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace patterns::monitor3 {
template
class Monitor {
public:
struct UnlockAndNotify {
std::mutex d_mutex;
std::condition_variable d_condition;
void lock() { d_mutex.lock(); }
void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
};
private:
mutable UnlockAndNotify d_combined;
mutable T d_data;
public:
std::tuple> makeProducerLock() const {
return { d_data, std::unique_lock{d_combined} };
}
template
std::tuple> makeConsumerLockWhen(PRED predicate) const {
std::unique_lock lock{d_combined.d_mutex};
d_combined.d_condition.wait(lock, [this, predicate]{ return predicate(d_data); });
return { d_data, std::move(lock) };
}
};
template
class ThreadQueue {
Monitor> d_monitor;
public:
void add(T number) {
auto[numberQueue, lock] = d_monitor.makeProducerLock();
numberQueue.push_back(number);
}
T remove() {
auto[numberQueue, lock] = d_monitor.makeConsumerLockWhen([](auto& numberQueue) { return !numberQueue.empty(); });
const auto number = numberQueue.front();
numberQueue.pop_front();
return number;
}
};
}
#endif
class Dice {
public:
int operator()(){ return rand(); }
private:
std::function rand = std::bind(std::uniform_int_distribution<>(1, 6),
std::default_random_engine());
};
int main(){
std::cout << 'n';
constexpr auto NumberThreads = 100;
patterns::monitor3::ThreadQueue safeQueue;
auto addLambda = [&safeQueue](int val){ safeQueue.add(val);
std::cout << val << " "
<< std::this_thread::get_id() << "; ";
};
auto getLambda = [&safeQueue]{ safeQueue.remove(); };
std::vector addThreads(NumberThreads);
Dice dice;
for (auto& thr: addThreads) thr = std::thread(addLambda, dice());
std::vector getThreads(NumberThreads);
for (auto& thr: getThreads) thr = std::thread(getLambda);
for (auto& thr: addThreads) thr.join();
for (auto& thr: getThreads) thr.join();
std::cout << "nn";
}
Merci encore à Franc et Dietmar. Je ne voulais pas prouver que la concurrence est très exigeante avec mon implémentation incorrecte d'une file d'attente thread-safe dans mon dernier article. Ce qui est particulièrement ennuyeux, c'est que je n'ai pas mis le mutex dans un verrou (Erreur 2). J'enseigne cela dans mes cours de C++ : NNM (No Naked Mutex).
Et après?
Dans mon prochain article, je plongerai profondément dans l'avenir de C++20 : C++23.
(rme)
#Correction #derreur #dans #message #sur #lobjet #moniteur #dans #file #dattente #threadsafe
1688447095