Développement de logiciels : un flux de travail consommateur-producteur basé sur Coroutine | heise en ligne

Développement de logiciels : un flux de travail consommateur-producteur basé sur Coroutine |  heise en ligne

2023-12-18 12:27:00

Les coroutines offrent un moyen intuitif et structuré d’écrire du code asynchrone. Ils permettent d’écrire des opérations asynchrones dans un style procédural. Il s’agit d’une fonctionnalité introduite dans C++20 pour simplifier la programmation asynchrone.

Publicité


Rainer Grimm travaille depuis de nombreuses années en tant qu’architecte logiciel, responsable d’équipe et de formation. Il aime écrire des articles sur les langages de programmation C++, Python et Haskell, mais aime également intervenir fréquemment lors de conférences spécialisées. Sur son blog Modern C++, il parle intensément de sa passion C++.

Bien que ce flux de travail monoproducteur-consommateur unique ne soit pas facile à comprendre, il constitue un bon point de départ pour des expériences de coroutine.



Les mécanismes existants tels que std::async, std::packaged_task ou des événements (std::condition_variable & std::mutex) synchronisez deux ou plusieurs threads sur le résultat de la tâche en établissant un canal de communication. Ce canal de communication a deux extrémités :

  • std::promisequi écrit soit le résultat, soit l’exception à l’état partagé, et
  • std::future (std::shared_future) – une extrémité réceptrice qui attend le résultat de la tâche (ou l’exception).

Contrairement à ce mécanisme préexistant, les coroutines ne sont pas directement connectées aux threads ou autres primitives de synchronisation du système d’exploitation. Il s’agit plutôt d’une pure abstraction logicielle basée sur l’objet de contrôle coroutine et la logique de la machine à états construite autour de lui.

Les coroutines sont sans pile – cela signifie que l’objet de contrôle doit être créé sur le tas. Par coïncidence, il s’agit d’une enveloppe de bibliothèque autour du promise_type (std::coroutine_handle), ce qui n’a en fait rien à voir std::promise a en commun.

Le promise_type est une interface (un point d’adaptation) qui décrit les états de transition prédéfinis dans la machine à états d’une coroutine.

Les coroutines sont très polyvalentes et peuvent être utilisées dans divers scénarios où il faut gérer un flux de messages asynchrone. Un exemple courant est la communication basée sur les sockets.

Aujourd’hui, je vais essayer d’expliquer les Coroutines avec un autre exemple : le workflow d’un seul producteur-unique-consommateur.

Nous devons d’abord définir le type de résultat pour la coroutine :

class[[nodiscard]] AudioDataResult final
{
    public:
        class promise_type;
        using handle_type = std::coroutine_handle;
            
        class promise_type
        {
            ...
        };
};

Il s’agit d’un emballage autour de l’intérieur : promise_type Taper. Nous décorons la classe englobante avec l’attribut [[nodiscard]]puisque le type de résultat est l’objet de contrôle de la coroutine : que l’on renvoie au code client pour gérer sa suspension/reprise.

@Note Le destructeur de la classe nettoie les ressources (mémoire dynamique) à la manière RAII, de sorte que le type de retour peut être strictement ignoré s’il n’est pas nécessaire de gérer l’état de la coroutine.

~AudioDataResult() { if(handle_) { handle_.destroy(); } }

Le type de résultat est de type déplacement uniquement : les opérations de copie sont interdites – pour empêcher la duplication de l’objet de contrôle.

// Make the result type move-only, 
//due to exclusive ownership over the handle

AudioDataResult(const AudioDataResult& ) = delete;
AudioDataResult& operator= (constAudioDataResult& ) = delete;

AudioDataResult(AudioDataResult&& other) noexcept:
handle_(std::exchange(other.handle_, nullptr))
{}

AudioDataResult& operator = (AudioDataResult&& other) noexcept
{
    using namespace std;
    AudioDataResult tmp =std::move(other);
    swap(*this, tmp);
    return *this;
}

Définissons maintenant l’interface promise_type elle-même :

// Predefined interface that has to be specify 
//in order to implement
// coroutine's state-machine transitions
class promise_type
{
    
    public:
        using value_type = std::vector;
        AudioDataResult get_return_object()
        {
            return AudioDataResult{ handle_type::from_promise(*this) };
        }
        std::suspend_never initial_suspend() noexcept { return{}; }
        std::suspend_always final_suspend() noexcept { return{}; }

        void return_void() {}
        void unhandled_exception()
        {
            std::rethrow_exception(std::current_exception());
        }

        // Generates the value and suspend the "producer"
        template 
        requires std::convertible_to, value_type>
        std::suspend_always yield_value(Data&& value)
        {
            data_ = std::forward(value);
            data_ready_.store(true, std::memory_order::relaxed);
            return {};
        }

    private:
        value_type data_;
        std::atomic data_ready_;
};//promise_type interface

Le promise_type définit l’infrastructure nécessaire de la coroutine. Il faut en outre promise_type pour toutes les coroutines qui veulent agir comme un générateur – “producteur” – pour sortir les valeurs avec lesquelles yield_valuela méthode peut être étendue (co_yield ≡ co_await promise_.yield_value). Lorsque les données sont consommées, nous avons besoin de la méthode wrapper appropriée resume() prévoir de reprendre le producteur.

void resume() { if( not handle_.done()) { handle_.resume();} }

Nous devons maintenant étendre la coroutine pour répondre aux besoins du consommateur : elle doit être synchronisée avec la disponibilité des données. En d’autres termes, le consommateur est mis en pause jusqu’à ce que les données soient signalées comme disponibles par le producteur. Pour ce faire, nous avons besoin de l’interface Awaiter implémenter:

class promise_type
{
    // Awaiter interface: for consumer waiting on data being ready
    struct AudioDataAwaiter
    {

        explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {}

        bool await_ready() const
        {
            return promise_.data_ready_.load(std::memory_order::relaxed);
        }

        void await_suspend(handle_type) const
        {
            while( not promise_.data_ready_.exchange(false))
            {
                std::this_thread::yield();
            }
        }

        // move assignment at client invocation side:
        //        const auto data = co_await audioDataResult;
        // This requires that coroutine's result type provides
        // the co_await unary operator

        value_type&& await_resume() const
        {
            return std::move(promise_.data_);
        }

    private:
            promise_type& promise_;

    };//Awaiter interface

};//promise_type

Dans la machine à états se trouve await_ready() le premier état de transition : l’état de préparation des données est vérifié. Si les données ne sont pas prêtes, procédez ensuite await_suspend() appelé. Ici, nous attendons en fait que le drapeau correspondant soit défini. Finalement, sera await_resume() appelé : Nous “déplaçons” la valeur du promise_typeen le convertissant sans condition en référence rvalue. Du côté de l’appel client, cela se traduit par l’opérateur d’affectation pour la valeur renvoyée étant – data – est appelé.

const auto data = co_await audioDataResult;

Pour que cela fonctionne, le type de résultat doit avoir l’opérateur unaire co_await fournir le nôtre Awaiterl’interface revient.

class AudioDataResult
{
    auto operator co_await() noexcept
    {
        return promise_type::AudioDataAwaiter{handle_.promise()};
    }
};

:

Le programme suivant producerConsumer.cpp montre une version simplifiée de l’exemple 1 :

// producerConsumer.cpp

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 


void funcName(const std::source_location location = std::source_location::current()) {
    std::cout << location.function_name() << 'n';
}


template 
void printContainer(const Container& container)
{
    typedef typename Container::value_type value_type;
    auto first = std::cbegin(container);
    auto last = std::cend(container);

    std::cout << " [";
    std::copy(first, std::prev(last), std::ostream_iterator(std::cout, ", "));
    std::cout << *std::prev(last) << "]n";
}




class [[nodiscard]] AudioDataResult final
{
    public:
        class promise_type;
        using handle_type = std::coroutine_handle;
        
        // Predefined interface that has to be specify in order to implement
        // coroutine's state-machine transitions
        class promise_type 
        {
            
            public:
                
                using value_type = std::vector;

                AudioDataResult get_return_object() 
                {
                    return AudioDataResult{handle_type::from_promise(*this)};
                }
                std::suspend_never initial_suspend() noexcept { return {}; }
                std::suspend_always final_suspend() noexcept { return {}; }
                void return_void() {}
                void unhandled_exception() 
                {
                    std::rethrow_exception(std::current_exception());
                }

                // Generates the value and suspend the "producer"
                template 
                requires std::convertible_to, value_type>
                std::suspend_always yield_value(Data&& value) 
                {
                    data_ = std::forward(value);
                    data_ready_.store(true);
                    return {};
                }

                // Awaiter interface: for consumer waiting on data being ready
                struct AudioDataAwaiter 
                {
                    explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {}

                    bool await_ready() const { return promise_.data_ready_.load();}
                    
                    void await_suspend(handle_type) const
                    {
                        while(not promise_.data_ready_.exchange(false)) {
                             std::this_thread::yield(); 
                        }
                    }
                    // move assignment at client invocation side: const auto data = co_await audioDataResult;
                    // This requires that coroutine's result type provides the co_await unary operator
                    value_type&& await_resume() const 
                    {
                        return std::move(promise_.data_);
                    }

                    private: 
                        promise_type& promise_;
                };//Awaiter interface

        
            private:
                value_type data_;
                std::atomic data_ready_;
        }; //promise_type interface

        
        auto operator co_await() noexcept   
        {
            return promise_type::AudioDataAwaiter{handle_.promise()};
        }

        // Make the result type move-only, due to ownership over the handle
        AudioDataResult(const AudioDataResult&) = delete;
        AudioDataResult& operator=(const AudioDataResult&) = delete;

        AudioDataResult(AudioDataResult&& other) noexcept: handle_(std::exchange(other.handle_, nullptr)){}
        AudioDataResult& operator=(AudioDataResult&& other) noexcept 
        {
            using namespace std;
            AudioDataResult tmp = std::move(other);
            swap(*this, tmp);
            return *this;
        }

        // d-tor: RAII
        ~AudioDataResult() { if (handle_) {funcName(); handle_.destroy();}}

        // For resuming the producer - at the point when the data are consumed
        void resume() {if (not handle_.done()) { funcName(); handle_.resume();}}
    
    private:
        AudioDataResult(handle_type handle) noexcept : handle_(handle) {}

    private:
    handle_type handle_;
};


using data_type = std::vector;
AudioDataResult producer(const data_type& data) 
{
    for (std::size_t i = 0; i < 5; ++i) {
        funcName();
        co_yield data;
    }
    co_yield data_type{}; // exit criteria
}

AudioDataResult consumer(AudioDataResult& audioDataResult) 
{
    while(true)
    {
        funcName();
        const auto data = co_await audioDataResult;
        if (data.empty()) {std::cout << "No data - exit!n"; break;}
        std::cout << "Data received:";
        printContainer(data);
        audioDataResult.resume(); // resume producer
    }
}

int main() 
{
    {
        const data_type data = {1, 2, 3, 4};
        auto audioDataProducer = producer(data);
        std::thread t ([&]{auto audioRecorded = consumer(audioDataProducer);});
        t.join();
    }

    std::cout << "bye-bye!n";
}

Enfin, voici le résultat du programme :



L'autre option est d'utiliser promise_type::await_transform()attendre la valeur spécifiée dans le promise_typel'instance est stockée et utilisée par le producteur.

class promise_type
{
    auto await_transform(handle_type other)
    {
        // Awaiter interface: remained the same
        struct AudioDataAwaiter
        {
            explicit AudioDataAwaiter(promise_type& promise)noexcept: promise_(promise) {}
            ...
        };

        return AudioDataAwaiter{other.promise()};
    }
};

De cette façon nous n'avons plus besoin de l'opérateur unaire co_await du type résultat, mais plutôt un opérateur de conversion (explicite),

explicit operator handle_type() const {return handle_;}

afin que nous puissions le remettre au point où le consommateur co_await appelle ce qui est interne à l'appel await_transform() est traduit.

const auto data = co_await static_cast(audioDataResult);

Nous pouvons illustrer cela comme suit : me.handle_.promise().await_transform(other.handle_)

:

Dans cet exemple simple, le producteur est mis en pause sans encourir de pénalité car lorsqu'il est repris, il renvoie exactement la même séquence de données - connue à l'avance. Dans un scénario réel, ce n'est probablement pas le cas : le producteur lui-même sera probablement une sorte d'intermédiaire - un destinataire de données envoyées de manière asynchrone et renvoyées au consommateur. Par conséquent, une logique de file d'attente doit être mise en œuvre du côté du producteur pour éviter la perte de données lorsque les données sont mises en pause et en attente que le consommateur les reprenne - pour compenser les différences entre le taux d'arrivée des données du producteur et le taux de consommation du consommateur.

En C++20, les trois voies peuvent être définies ou par default demande. Cela signifie que les six opérateurs de comparaison sont disponibles : ==, !=, <, <=, > et >=. Vous pouvez également utiliser l'opérateur d'égalité (==) définir ou avec default demande.

Mon blog prend une petite pause de Noël. Je publierai le prochain article le 8 janvier. Je souhaite à tous les lecteurs un bon moment.


(moi)

Vers la page d'accueil



#Développement #logiciels #flux #travail #consommateurproducteur #basé #sur #Coroutine #heise #ligne
1702944193

Facebook
Twitter
LinkedIn
Pinterest

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.