The Wright Way – Asynchronous Operations

Introduction

I have always felt like there is a dearth of cohesive C/C++ education after school. Each software engineer seems to be on their own when it comes to learning new features, how to use a new library, or what the best practices are for common situations.

This newsletter is my attempt to share some of what I have learned; the way I do things. It is not meant to be authoritative, but, rather, a launching pad for further discussion and investigation. Often, I’ll try something new just so that I can vet it for future use or just to experiment. Not everything pans out, so take what you will and leave the rest.

Asynchronous Operations

Oftentimes there are read, writes, calculations, or other operations that are too laborious for the current thread, and it would be nice to off load them to a separate thread that could complete them at its leisure; freeing up the current thread for more important duties.

Implementing the inter-thread messaging required for such a scheme can be tricky. Thankfully there is a library built specifically for asynchronous operations, ASIO. ASIO comes standalone, and as a member of Boost. Since Boost is already widely included and used in lots of code, the focus of this volume of the newsletter will be Boost ASIO, though the two are essentially identical except for namespaces.

ASIO is largely focused on network I/O (input/output), but in order to facilitate that, a robust asynchronous core was created. That core can easily be reused.

Figure 1 – ASIO Building Blocks

Real-world Example

One operation that is important, but isn’t mission critical enough that it should slow down the current thread, is logging. Depending on how and to whom logs are disseminated, a single act of logging can result in a database call, a syslog message send, a file write, or a myriad of other expensive endeavors.

This example streams logs to standard out:

#include <cstdlib> // EXIT_SUCCESS
#include <iostream>
#include <string>

void log(std::string const& message)
{
   std::cout << message << std::endl;
}

int main(int argc, char* const argv[])
{
   // Important operations.

   log("Important operations finished.");

   // The next important operations that are waiting for the logging to finish. 

   return EXIT_SUCCESS;
}

Streaming to standard out is not the most expensive thing a log function can do, but even something simple like that can delay the next important operations more than is desired; the logging isn’t the important bit.

The following will convert this example step by step so that logging is asynchronous. In order to get on the path to asynchronosity, while adding some cohesiveness, logging should first be moved out to a class:

#include "boost/noncopyable.hpp"

class Logger : public boost::noncopyable
{
public:

   Logger() { }

   ~Logger() { }

   void log(std::string const& message)
   {
      std::cout << message << std::endl;
   }
};

Now, it is time to get into the interesting stuff. The corner stone of ASIO is the boost::asio::io_service class. Even though its name suggests it is made for I/O operations, it is essentially just a multiple-writer multiple-reader work queue that handles all the thread safety issues and corner-cases automatically. In this example, it’ll be a member of the class called m_service:

#include "boost/asio/io_service.hpp"
#include "boost/noncopyable.hpp"

class Logger : public boost::noncopyable
{
public:

   Logger() { }

   ~Logger() { }

   void log(std::string const& message)
   {
      std::cout << message << std::endl;
   }

private:

   boost::asio::io_service m_service;
};

The service does not need to be contained in a class like this example shows. The service could be a global or a singleton, or it could be created once and passed around to everyone that needs to use it. However, because the service doesn’t do anything itself, it makes sense to couple it with things that actually do work, workers, such as threads. A thread can be added to the class like so:

#include "boost/asio/io_service.hpp"
#include "boost/bind/bind.hpp"
#include "boost/noncopyable.hpp"
#include "boost/thread/thread.hpp"

class Logger : public boost::noncopyable
{
public:

   Logger()
   {
      m_p_work.reset(new boost::asio::io_service::work(m_service));
      m_p_thread.reset(
         new boost::thread(
            boost::bind(
               &boost::asio::io_service::run, 
               boost::ref(m_service))));
   }

   ~Logger()
   {
      m_p_work.reset();
      m_p_thread->join();
   }

   void log(std::string const& message)
   {
      std::cout << message << std::endl;
   }

private:

   boost::asio::io_service m_service;
   std::unique_ptr< boost::asio::io_service::work > m_p_work;
   std::unique_ptr< boost::thread > m_p_thread;
};

There’s a lot of new stuff in this example. The std::unique_ptr object is a smart pointer introduced to the standard library in C++11. For the purposes of this example, it can be considered the same as any smart pointer that is familiar, or even a raw pointer; heaven forbid. The m_p_work and m_p_thread class members are pointers so that their lifetime can better be controlled.

Before the thread is constructed and starts running, the service needs to be given some work. Otherwise, the thread will find that the service has nothing to do, and it will exit immediately without doing anything. Since there isn’t any particular work for the service to do at this point, some artificial work should be created. The boost::asio::io_service::work class was designed especially for this. It acts as work for the service to do when there is nothing else to do, though it doesn’t waste CPU resources doing so; it just keeps the thread alive.

After the artificial work is created, the thread can be created, and instead of using a member function in this class, or some other free function, as the thread’s entry point, the run() function of the boost::asio::io_service class can be used directly.

The last new part of this example is that during destruction the artificial work must be destroyed in order to allow the thread to exit gracefully after it finishes processing all of the work in the service. There are more aggressive ways to make the thread exit sooner, but they involve discarding work in the service, which can lead to additional corner cases depending on what the work was.

All that is needed now, is for the log function to create work and give it to the service, instead of doing the work itself:

#include <memory> // unique_ptr

#include "boost/asio/io_service.hpp"
#include "boost/bind/bind.hpp"
#include "boost/noncopyable.hpp"
#include "boost/thread/thread.hpp"

class Logger : public boost::noncopyable
{
public:

   Logger()
   {
      m_p_work.reset(new boost::asio::io_service::work(m_service));
      m_p_thread.reset(
         new boost::thread(
            boost::bind(
               &boost::asio::io_service::run, 
               boost::ref(m_service))));
   }

   ~Logger()
   {
      m_p_work.reset();
      m_p_thread->join();
   }

   void log(std::string const& message)
   {
      m_service.post(boost::bind(&Logger::async_log, this, message));
   }

private:

   boost::asio::io_service m_service;
   std::unique_ptr< boost::asio::io_service::work > m_p_work;
   std::unique_ptr< boost::thread > m_p_thread;

   void async_log(std::string const& message)
   {
      std::cout << message << std::endl;
   }
};

The log function now adds work to the service in the form of a functor that calls the async_log function, which does the actual work of streaming the message to standard out. The interface to the class hasn’t changed, so the code that uses it, doesn’t have to change either.

The service can handle multiple readers, so if the work is reentrant it makes sense to take advantage of hardware parallelism. To do so, simply use more than one thread. The class needs only slight modification to do so:

#include <memory> // unique_ptr

#include "boost/asio/io_service.hpp"
#include "boost/bind/bind.hpp"
#include "boost/noncopyable.hpp"
#include "boost/thread/thread.hpp"

class Logger : public boost::noncopyable
{
public:

   Logger()
   {
      m_p_work.reset(new boost::asio::io_service::work(m_service));

      for (unsigned int i = 0; i < boost::thread::hardware_concurrency(); ++i)
      {
         m_threads.create_thread(
            boost::bind(
               &boost::asio::io_service::run,
               boost::ref(m_service)));
      }
   }

   ~Logger()
   {
      m_p_work.reset();
      m_threads.join_all();
   }

   void log(std::string const& message)
   {
      m_service.post(boost::bind(&Logger::async_log, this, message));
   }

private:

   boost::asio::io_service m_service;
   std::unique_ptr< boost::asio::io_service::work > m_p_work;
   boost::thread_group m_threads;

   void async_log(std::string const& message)
   {
      std::cout << message << std::endl;
   }
};

This example shows how one thread can be created per CPU core. More or less can easily be added or removed as the situation demands.

Wrap-up

This volume of the newsletter was mainly intended to introduce the core of ASIO. I want to do so in a concrete way that makes sense and that I’ve used in several places in my code. Note that this example didn’t really have anything to do with I/O at all, however the rest of ASIO builds upon the concept, so it was a good place to start, and hopefully it’ll be something to think about.

Future newsletter may touch upon ASIO again, so stay tuned.