2023-12-18 12:27:00
Les coroutines offrent un moyen intuitif et structuré d’écrire du code asynchrone. Ils permettent d’écrire des opérations asynchrones dans un style procédural. Il s’agit d’une fonctionnalité introduite dans C++20 pour simplifier la programmation asynchrone.
Publicité
Rainer Grimm travaille depuis de nombreuses années en tant qu’architecte logiciel, responsable d’équipe et de formation. Il aime écrire des articles sur les langages de programmation C++, Python et Haskell, mais aime également intervenir fréquemment lors de conférences spécialisées. Sur son blog Modern C++, il parle intensément de sa passion C++.
Bien que ce flux de travail monoproducteur-consommateur unique ne soit pas facile à comprendre, il constitue un bon point de départ pour des expériences de coroutine.
Les mécanismes existants tels que std::async,
std::packaged_task
ou des événements (std::condition_variable & std::mutex
) synchronisez deux ou plusieurs threads sur le résultat de la tâche en établissant un canal de communication. Ce canal de communication a deux extrémités :
std::promise
qui écrit soit le résultat, soit l’exception à l’état partagé, etstd::future
(std::shared_future
) – une extrémité réceptrice qui attend le résultat de la tâche (ou l’exception).
Contrairement à ce mécanisme préexistant, les coroutines ne sont pas directement connectées aux threads ou autres primitives de synchronisation du système d’exploitation. Il s’agit plutôt d’une pure abstraction logicielle basée sur l’objet de contrôle coroutine et la logique de la machine à états construite autour de lui.
Les coroutines sont sans pile – cela signifie que l’objet de contrôle doit être créé sur le tas. Par coïncidence, il s’agit d’une enveloppe de bibliothèque autour du promise_type
(std::coroutine_handle
), ce qui n’a en fait rien à voir std::promise
a en commun.
Le promise_type
est une interface (un point d’adaptation) qui décrit les états de transition prédéfinis dans la machine à états d’une coroutine.
Les coroutines sont très polyvalentes et peuvent être utilisées dans divers scénarios où il faut gérer un flux de messages asynchrone. Un exemple courant est la communication basée sur les sockets.
Aujourd’hui, je vais essayer d’expliquer les Coroutines avec un autre exemple : le workflow d’un seul producteur-unique-consommateur.
mise en œuvre
Nous devons d’abord définir le type de résultat pour la coroutine :
class[[nodiscard]] AudioDataResult final
{
public:
class promise_type;
using handle_type = std::coroutine_handle;
class promise_type
{
...
};
};
Il s’agit d’un emballage autour de l’intérieur : promise_type
Taper. Nous décorons la classe englobante avec l’attribut [[nodiscard]]
puisque le type de résultat est l’objet de contrôle de la coroutine : que l’on renvoie au code client pour gérer sa suspension/reprise.
@Note Le destructeur de la classe nettoie les ressources (mémoire dynamique) à la manière RAII, de sorte que le type de retour peut être strictement ignoré s’il n’est pas nécessaire de gérer l’état de la coroutine.
~AudioDataResult() { if(handle_) { handle_.destroy(); } }
Le type de résultat est de type déplacement uniquement : les opérations de copie sont interdites – pour empêcher la duplication de l’objet de contrôle.
// Make the result type move-only,
//due to exclusive ownership over the handle
AudioDataResult(const AudioDataResult& ) = delete;
AudioDataResult& operator= (constAudioDataResult& ) = delete;
AudioDataResult(AudioDataResult&& other) noexcept:
handle_(std::exchange(other.handle_, nullptr))
{}
AudioDataResult& operator = (AudioDataResult&& other) noexcept
{
using namespace std;
AudioDataResult tmp =std::move(other);
swap(*this, tmp);
return *this;
}
Définissons maintenant l’interface promise_type elle-même :
// Predefined interface that has to be specify
//in order to implement
// coroutine's state-machine transitions
class promise_type
{
public:
using value_type = std::vector;
AudioDataResult get_return_object()
{
return AudioDataResult{ handle_type::from_promise(*this) };
}
std::suspend_never initial_suspend() noexcept { return{}; }
std::suspend_always final_suspend() noexcept { return{}; }
void return_void() {}
void unhandled_exception()
{
std::rethrow_exception(std::current_exception());
}
// Generates the value and suspend the "producer"
template
requires std::convertible_to, value_type>
std::suspend_always yield_value(Data&& value)
{
data_ = std::forward(value);
data_ready_.store(true, std::memory_order::relaxed);
return {};
}
private:
value_type data_;
std::atomic data_ready_;
};//promise_type interface
Le promise_type
définit l’infrastructure nécessaire de la coroutine. Il faut en outre promise_type
pour toutes les coroutines qui veulent agir comme un générateur – “producteur” – pour sortir les valeurs avec lesquelles yield_value
la méthode peut être étendue (co_yield ≡ co_await promise_.yield_value
). Lorsque les données sont consommées, nous avons besoin de la méthode wrapper appropriée resume()
prévoir de reprendre le producteur.
void resume() { if( not handle_.done()) { handle_.resume();} }
Nous devons maintenant étendre la coroutine pour répondre aux besoins du consommateur : elle doit être synchronisée avec la disponibilité des données. En d’autres termes, le consommateur est mis en pause jusqu’à ce que les données soient signalées comme disponibles par le producteur. Pour ce faire, nous avons besoin de l’interface Awaiter
implémenter:
class promise_type
{
// Awaiter interface: for consumer waiting on data being ready
struct AudioDataAwaiter
{
explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {}
bool await_ready() const
{
return promise_.data_ready_.load(std::memory_order::relaxed);
}
void await_suspend(handle_type) const
{
while( not promise_.data_ready_.exchange(false))
{
std::this_thread::yield();
}
}
// move assignment at client invocation side:
// const auto data = co_await audioDataResult;
// This requires that coroutine's result type provides
// the co_await unary operator
value_type&& await_resume() const
{
return std::move(promise_.data_);
}
private:
promise_type& promise_;
};//Awaiter interface
};//promise_type
Dans la machine à états se trouve await_ready()
le premier état de transition : l’état de préparation des données est vérifié. Si les données ne sont pas prêtes, procédez ensuite await_suspend()
appelé. Ici, nous attendons en fait que le drapeau correspondant soit défini. Finalement, sera await_resume()
appelé : Nous “déplaçons” la valeur du promise_type
en le convertissant sans condition en référence rvalue. Du côté de l’appel client, cela se traduit par l’opérateur d’affectation pour la valeur renvoyée étant – data
– est appelé.
const auto data = co_await audioDataResult;
Pour que cela fonctionne, le type de résultat doit avoir l’opérateur unaire co_await
fournir le nôtre Awaiter
l’interface revient.
class AudioDataResult
{
auto operator co_await() noexcept
{
return promise_type::AudioDataAwaiter{handle_.promise()};
}
};
Le programme suivant producerConsumer.cpp
montre une version simplifiée de l’exemple 1 :
// producerConsumer.cpp
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
void funcName(const std::source_location location = std::source_location::current()) {
std::cout << location.function_name() << 'n';
}
template
void printContainer(const Container& container)
{
typedef typename Container::value_type value_type;
auto first = std::cbegin(container);
auto last = std::cend(container);
std::cout << " [";
std::copy(first, std::prev(last), std::ostream_iterator(std::cout, ", "));
std::cout << *std::prev(last) << "]n";
}
class [[nodiscard]] AudioDataResult final
{
public:
class promise_type;
using handle_type = std::coroutine_handle;
// Predefined interface that has to be specify in order to implement
// coroutine's state-machine transitions
class promise_type
{
public:
using value_type = std::vector;
AudioDataResult get_return_object()
{
return AudioDataResult{handle_type::from_promise(*this)};
}
std::suspend_never initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception()
{
std::rethrow_exception(std::current_exception());
}
// Generates the value and suspend the "producer"
template
requires std::convertible_to, value_type>
std::suspend_always yield_value(Data&& value)
{
data_ = std::forward(value);
data_ready_.store(true);
return {};
}
// Awaiter interface: for consumer waiting on data being ready
struct AudioDataAwaiter
{
explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {}
bool await_ready() const { return promise_.data_ready_.load();}
void await_suspend(handle_type) const
{
while(not promise_.data_ready_.exchange(false)) {
std::this_thread::yield();
}
}
// move assignment at client invocation side: const auto data = co_await audioDataResult;
// This requires that coroutine's result type provides the co_await unary operator
value_type&& await_resume() const
{
return std::move(promise_.data_);
}
private:
promise_type& promise_;
};//Awaiter interface
private:
value_type data_;
std::atomic data_ready_;
}; //promise_type interface
auto operator co_await() noexcept
{
return promise_type::AudioDataAwaiter{handle_.promise()};
}
// Make the result type move-only, due to ownership over the handle
AudioDataResult(const AudioDataResult&) = delete;
AudioDataResult& operator=(const AudioDataResult&) = delete;
AudioDataResult(AudioDataResult&& other) noexcept: handle_(std::exchange(other.handle_, nullptr)){}
AudioDataResult& operator=(AudioDataResult&& other) noexcept
{
using namespace std;
AudioDataResult tmp = std::move(other);
swap(*this, tmp);
return *this;
}
// d-tor: RAII
~AudioDataResult() { if (handle_) {funcName(); handle_.destroy();}}
// For resuming the producer - at the point when the data are consumed
void resume() {if (not handle_.done()) { funcName(); handle_.resume();}}
private:
AudioDataResult(handle_type handle) noexcept : handle_(handle) {}
private:
handle_type handle_;
};
using data_type = std::vector;
AudioDataResult producer(const data_type& data)
{
for (std::size_t i = 0; i < 5; ++i) {
funcName();
co_yield data;
}
co_yield data_type{}; // exit criteria
}
AudioDataResult consumer(AudioDataResult& audioDataResult)
{
while(true)
{
funcName();
const auto data = co_await audioDataResult;
if (data.empty()) {std::cout << "No data - exit!n"; break;}
std::cout << "Data received:";
printContainer(data);
audioDataResult.resume(); // resume producer
}
}
int main()
{
{
const data_type data = {1, 2, 3, 4};
auto audioDataProducer = producer(data);
std::thread t ([&]{auto audioRecorded = consumer(audioDataProducer);});
t.join();
}
std::cout << "bye-bye!n";
}
Enfin, voici le résultat du programme :
L'autre option est d'utiliser promise_type::await_transform()
attendre la valeur spécifiée dans le promise_type
l'instance est stockée et utilisée par le producteur.
class promise_type
{
auto await_transform(handle_type other)
{
// Awaiter interface: remained the same
struct AudioDataAwaiter
{
explicit AudioDataAwaiter(promise_type& promise)noexcept: promise_(promise) {}
...
};
return AudioDataAwaiter{other.promise()};
}
};
De cette façon nous n'avons plus besoin de l'opérateur unaire co_await
du type résultat, mais plutôt un opérateur de conversion (explicite),
explicit operator handle_type() const {return handle_;}
afin que nous puissions le remettre au point où le consommateur co_await
appelle ce qui est interne à l'appel await_transform()
est traduit.
const auto data = co_await static_cast(audioDataResult);
Nous pouvons illustrer cela comme suit : me.handle_.promise().await_transform(other.handle_)
conclusion
Dans cet exemple simple, le producteur est mis en pause sans encourir de pénalité car lorsqu'il est repris, il renvoie exactement la même séquence de données - connue à l'avance. Dans un scénario réel, ce n'est probablement pas le cas : le producteur lui-même sera probablement une sorte d'intermédiaire - un destinataire de données envoyées de manière asynchrone et renvoyées au consommateur. Par conséquent, une logique de file d'attente doit être mise en œuvre du côté du producteur pour éviter la perte de données lorsque les données sont mises en pause et en attente que le consommateur les reprenne - pour compenser les différences entre le taux d'arrivée des données du producteur et le taux de consommation du consommateur.
Et après?
En C++20, les trois voies peuvent être définies ou par default
demande. Cela signifie que les six opérateurs de comparaison sont disponibles : ==, !=, <, <=, >
et >=
. Vous pouvez également utiliser l'opérateur d'égalité (==
) définir ou avec default
demande.
Mes courtes vacances de Noël
Mon blog prend une petite pause de Noël. Je publierai le prochain article le 8 janvier. Je souhaite à tous les lecteurs un bon moment.
(moi)
#Développement #logiciels #flux #travail #consommateurproducteur #basé #sur #Coroutine #heise #ligne
1702944193