Confus lorsque la méthode d'exécution boost :: asio :: io_service bloque / débloque

88

Étant un débutant total de Boost.Asio, je suis confus avec io_service::run(). J'apprécierais que quelqu'un puisse m'expliquer quand cette méthode bloque / débloque. Les documentations indiquent:

La run()fonction se bloque jusqu'à ce que tout le travail soit terminé et qu'il n'y ait plus de gestionnaires à distribuer, ou jusqu'à ce que le io_servicesoit arrêté.

Plusieurs threads peuvent appeler la run()fonction pour configurer un pool de threads à partir duquel les io_servicegestionnaires peuvent exécuter. Tous les threads en attente dans le pool sont équivalents et io_servicepeuvent choisir n'importe lequel d'entre eux pour appeler un gestionnaire.

Une sortie normale de la run()fonction implique que l' io_serviceobjet est arrêté (la stopped()fonction renvoie true). Les appels suivants à run(), run_one(), poll()ou poll_one()retourneront immédiatement à moins d' un appel avant reset().

Que signifie la déclaration suivante?

[...] plus de gestionnaires à envoyer [...]


En essayant de comprendre le comportement de io_service::run(), je suis tombé sur cet exemple (exemple 3a). À l'intérieur, j'observe que io_service->run()bloque et attend les bons de travail.

// WorkerThread invines io_service->run()
void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service);
void CalculateFib(size_t);

boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<boost::asio::io_service::work> work(
   new boost::asio::io_service::work(*io_service));

// ...

boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)
{
  worker_threads.create_thread(boost::bind(&WorkerThread, io_service));
}

io_service->post( boost::bind(CalculateFib, 3));
io_service->post( boost::bind(CalculateFib, 4));
io_service->post( boost::bind(CalculateFib, 5));

work.reset();
worker_threads.join_all();

Cependant, dans le code suivant sur lequel je travaillais, le client se connecte à l'aide de TCP / IP et la méthode d'exécution se bloque jusqu'à ce que les données soient reçues de manière asynchrone.

typedef boost::asio::ip::tcp tcp;
boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service));

// Connect to 127.0.0.1:9100.
tcp::resolver resolver(*io_service);
tcp::resolver::query query("127.0.0.1", 
                           boost::lexical_cast< std::string >(9100));
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
socket->connect(endpoint_iterator->endpoint());

// Just blocks here until a message is received.
socket->async_receive(boost::asio::buffer(buf_client, 3000), 0,
                      ClientReceiveEvent);
io_service->run();

// Write response.
boost::system::error_code ignored_error;
std::cout << "Sending message \n";
boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);

Toute explication de run()cela décrit son comportement dans les deux exemples ci-dessous serait appréciée.

MistyD
la source

Réponses:

234

Fondation

Commençons par un exemple simplifié et examinons les pièces Boost.Asio pertinentes:

void handle_async_receive(...) { ... }
void print() { ... }

...  

boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);

...

io_service.post(&print);                             // 1
socket.connect(endpoint);                            // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print);                             // 4
io_service.run();                                    // 5

Qu'est-ce qu'un gestionnaire ?

Un gestionnaire n'est rien de plus qu'un rappel. Dans l'exemple de code, il y a 3 gestionnaires:

  • Le printgestionnaire (1).
  • Le handle_async_receivegestionnaire (3).
  • Le printgestionnaire (4).

Même si la même print()fonction est utilisée deux fois, chaque utilisation est considérée comme créant son propre gestionnaire identifiable de manière unique. Les gestionnaires peuvent prendre de nombreuses formes et tailles, allant des fonctions de base comme celles ci-dessus à des constructions plus complexes telles que les foncteurs générés à partir de boost::bind()et lambdas. Indépendamment de la complexité, le gestionnaire ne reste rien de plus qu'un rappel.

Qu'est-ce que le travail ?

Le travail est un traitement que Boost.Asio a été invité à effectuer au nom du code d'application. Parfois, Boost.Asio peut commencer une partie du travail dès qu'il en a été informé, et d'autres fois, il peut attendre pour faire le travail plus tard. Une fois le travail terminé, Boost.Asio informera l'application en invoquant le gestionnaire fourni .

Boost.Asio garantit que les gestionnaires ne fonctionnera que dans un thread qui appelle actuellement run(), run_one(), poll()ou poll_one(). Ce sont les threads qui feront le travail et les gestionnaires d' appels . Par conséquent, dans l'exemple ci-dessus, print()n'est pas appelé lorsqu'il est publié dans le io_service(1). Au lieu de cela, il est ajouté au io_serviceet sera appelé ultérieurement. Dans ce cas, il entre io_service.run()(5).

Que sont les opérations asynchrones?

Une opération asynchrone crée du travail et Boost.Asio appellera un gestionnaire pour informer l'application lorsque le travail est terminé. Les opérations asynchrones sont créées en appelant une fonction qui a un nom avec le préfixe async_. Ces fonctions sont également appelées fonctions de lancement .

Les opérations asynchrones peuvent être décomposées en trois étapes uniques:

  • Initier, ou informer, l'associé io_servicequi fonctionne doit être fait. L' async_receiveopération (3) informe le io_servicequ'elle devra lire de manière asynchrone les données du socket, puis async_receiveretourne immédiatement.
  • Faire le vrai travail. Dans ce cas, lors de la socketréception des données, les octets seront lus et copiés dans buffer. Le travail réel sera effectué soit:
    • La fonction de lancement (3), si Boost.Asio peut déterminer qu'elle ne bloquera pas.
    • Lorsque l'application exécute explicitement le io_service(5).
  • handle_async_receive Appel du ReadHandler . Une fois de plus, les gestionnaires ne sont appelés que dans les threads exécutant le io_service. Ainsi, quel que soit le moment où le travail est effectué (3 ou 5), il est garanti qu'il handle_async_receive()ne sera invoqué que dans io_service.run()(5).

La séparation dans le temps et dans l'espace entre ces trois étapes est appelée inversion de flux de contrôle. C'est l'une des complexités qui rend la programmation asynchrone difficile. Cependant, il existe des techniques qui peuvent aider à atténuer ce problème, comme l'utilisation de coroutines .

Que fait io_service.run()-on?

Lorsqu'un thread appelle io_service.run(), le travail et les gestionnaires seront appelés à partir de ce thread. Dans l'exemple ci-dessus, io_service.run()(5) bloquera jusqu'à ce que:

  • Il a été appelé et renvoyé par les deux printgestionnaires, l'opération de réception se termine avec succès ou échec, et son handle_async_receivegestionnaire a été appelé et renvoyé.
  • Le io_serviceest explicitement arrêté via io_service::stop().
  • Une exception est levée depuis un gestionnaire.

Un flux de psuedo-ish potentiel pourrait être décrit comme suit:

créer io_service
créer une socket
ajouter un gestionnaire d'impression à io_service (1)
attendez que la prise se connecte (2)
ajouter une demande de travail de lecture asynchrone à io_service (3)
ajouter un gestionnaire d'impression à io_service (4)
exécuter le io_service (5)
  y a-t-il du travail ou des manutentionnaires?
    oui, il y a 1 travail et 2 manutentionnaires
      la socket a-t-elle des données? non, ne fais rien
      exécuter le gestionnaire d'impression (1)
  y a-t-il du travail ou des manutentionnaires?
    oui, il y a 1 travail et 1 manutentionnaire
      la socket a-t-elle des données? non, ne fais rien
      exécuter le gestionnaire d'impression (4)
  y a-t-il du travail ou des manutentionnaires?
    oui, il y a 1 oeuvre
      la socket a-t-elle des données? non, continue d'attendre
  - la socket reçoit des données -
      socket a des données, lisez-les dans le tampon
      ajouter le gestionnaire handle_async_receive à io_service
  y a-t-il du travail ou des manutentionnaires?
    oui, il y a 1 gestionnaire
      exécuter le gestionnaire handle_async_receive (3)
  y a-t-il du travail ou des manutentionnaires?
    non, définissez io_service comme arrêté et retournez

Remarquez que lorsque la lecture s'est terminée, il a ajouté un autre gestionnaire au io_service. Ce détail subtil est une caractéristique importante de la programmation asynchrone. Il permet aux gestionnaires d'être enchaînés ensemble. Par exemple, si elle handle_async_receiven'obtient pas toutes les données attendues, alors son implémentation pourrait publier une autre opération de lecture asynchrone, ce qui entraînerait io_serviceplus de travail et ne reviendrait donc pas io_service.run().

Notez que lorsque le io_servicen'a plus de travail, l'application doit reset()le io_serviceavant de l'exécuter à nouveau.


Exemple de question et code d'exemple 3a

Maintenant, examinons les deux morceaux de code référencés dans la question.

Code de la question

socket->async_receiveajoute du travail au io_service. Ainsi, io_service->run()bloquera jusqu'à ce que l'opération de lecture se termine avec succès ou erreur, et ClientReceiveEventsoit terminée, soit lève une exception.

Exemple de code 3a

Dans l'espoir de le rendre plus facile à comprendre, voici un petit exemple annoté 3a:

void CalculateFib(std::size_t n);

int main()
{
  boost::asio::io_service io_service;
  boost::optional<boost::asio::io_service::work> work =       // '. 1
      boost::in_place(boost::ref(io_service));                // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  work = boost::none;                                         // 4
  worker_threads.join_all();                                  // 5
}

À un niveau élevé, le programme créera 2 threads qui traiteront la io_serviceboucle d'événements de (2). Il en résulte un pool de threads simple qui calculera les nombres de Fibonacci (3).

La seule différence majeure entre le code de question et ce code est que ce code appelle io_service::run()(2) avant que le travail réel et les gestionnaires ne soient ajoutés au io_service(3). Pour empêcher le io_service::run()retour immédiat, un io_service::workobjet est créé (1). Cet objet empêche le io_servicede manquer de travail; par conséquent, io_service::run()ne reviendra pas en raison de l'absence de travail.

Le flux global est le suivant:

  1. Créez et ajoutez l' io_service::workobjet ajouté au io_service.
  2. Un pool de threads créé qui appelle io_service::run(). Ces threads de travail ne reviendront pas à io_servicecause de l' io_service::workobjet.
  3. Ajoutez 3 gestionnaires qui calculent les nombres de Fibonacci au io_service, et revenez immédiatement. Les threads de travail, et non le thread principal, peuvent commencer à exécuter ces gestionnaires immédiatement.
  4. Supprimez l' io_service::workobjet.
  5. Attendez que les threads de travail se terminent. Cela ne se produira qu'une fois que les 3 gestionnaires auront terminé leur exécution, car io_serviceni les gestionnaires ni ne travaillent.

Le code peut être écrit différemment, de la même manière que le code d'origine, où les gestionnaires sont ajoutés au io_service, puis la io_serviceboucle d'événements est traitée. Cela supprime le besoin d'utiliser io_service::worket génère le code suivant:

int main()
{
  boost::asio::io_service io_service;

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'
  worker_threads.join_all();                                  // 5
}

Synchrone vs asynchrone

Bien que le code de la question utilise une opération asynchrone, il fonctionne efficacement de manière synchrone, car il attend la fin de l'opération asynchrone:

socket.async_receive(buffer, handler)
io_service.run();

est équivalent à:

boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);

En règle générale, essayez d'éviter de mélanger des opérations synchrones et asynchrones. Souvent, cela peut transformer un système complexe en un système compliqué. Cette réponse met en évidence les avantages de la programmation asynchrone, dont certains sont également traités dans la documentation Boost.Asio .

Tanner Sansbury
la source
13
Article génial. Je voudrais ajouter juste une chose car je pense que cela ne retient pas suffisamment l'attention: après le retour de run (), vous devez appeler reset () sur votre io_service avant de pouvoir l'exécuter à nouveau. Sinon, il peut retourner instantanément, qu'il y ait ou non des opérations async_ en attente ou non.
DeVadder
D'où vient le tampon? Qu'Est-ce que c'est?
ruipacheco
Je suis toujours confus. Si le mixage est synchrone et asynchrone n'est pas recommandé, quel est le mode asynchrone pur? pouvez-vous donner un exemple montrant le code sans io_service.run () ;?
Splash du
@Splash On peut utiliser io_service.poll()pour traiter la boucle d'événements sans bloquer les opérations en cours. La principale recommandation pour éviter de mélanger les opérations synchrones et asynchrones est d'éviter d'ajouter une complexité inutile et d'éviter une mauvaise réactivité lorsque les gestionnaires prennent beaucoup de temps à se terminer. Il y a des cas où c'est sûr, par exemple quand on sait que l'opération synchrone ne se bloquera pas.
Tanner Sansbury
Qu'entendez-vous par "actuellement" dans "Boost.Asio garantit que les gestionnaires ne fonctionneront que dans un thread qui appelle actuellementrun() ...." ? S'il y a N threads (qui a appelé run()), alors lequel est le thread "courant"? Il peut y en avoir plusieurs? Ou voulez-vous dire que le thread qui a fini d'exécuter le async_*()(disons async_read), est garanti d'appeler ses gestionnaires également?
Nawaz
18

Pour simplifier comment faire run, considérez-le comme un employé qui doit traiter une pile de papier; il prend une feuille, fait ce que la feuille dit, jette la feuille et prend la suivante; quand il n'a plus de draps, il quitte le bureau. Sur chaque feuille, il peut y avoir n'importe quel type d'instructions, même en ajoutant une nouvelle feuille à la pile. Retour à asio: vous pouvez donner à une io_serviceœuvre de deux manières, essentiellement: en l'utilisant postcomme dans l'échantillon que vous avez lié, ou en utilisant d'autres objets qui appellent en interne postle io_service, comme les méthodes socketet ses async_*.

Loghorn
la source