Modern C++ Features and Proven Concepts: Active Object, External Polymorphism and Coroutines
Ivan Kostruba
Posted on January 9, 2023
Summary
In this article, I'll show you how external polymorphism helps you write beautiful and clean programs, and talk about some basic and advanced implementation techniques. The good old Active Object concurrency pattern will be the running example. In the end, I'll show you how easy it is to implement this pattern using C++20 coroutines and how you can use them to make Active Object even more powerful by implementing true asynchronous functions in it.
Problem Statement
Imagine we have an established VoIP call between a user device and our business logic server. The user is in the voice menu and listens to prompts that the server plays for them. The user can press numbers to select options in the menu or can hang up and end the call. To support it, we must receive and parse SIP protocol messages and run the corresponding business logic. The details of SIP communications are beyond the scope of this example, we will focus only on the business logic. We must keep in mind that we must parse the messages as quickly as possible, but the business logic of getting the next prompt and playing it back to the user may be slower to process and should not interfere with the processing of the SIP communication. This is where Active Object comes in handy.
Active Object Internals
Even if the reader is not familiar with the aforementioned pattern, it’s pretty easy to grasp. Here is the simplest implementation.
class VoiceMenuHandler {
public:
// These methods will return immediately, deferring the actual work.
void receiveInput(const MenuInput& data);
void receiveHangup(const HangUp& data);
private:
std::string fetchMenuSectionPrompt(
char digit, const std::string& callId);
void playVoiceMenuPrompt(
const std::string& callId, const std::string& prompt);
// This method will do actual processing of user input by calling
// the two functions from above.
void processInput(const MenuInput& data);
void cleanupCallData(const std::string& callId);
// This method will do actual processing of hangup by calling the
// function from above.
void processHangup(const HangUp& data);
// The thread where the tasks will run.
Worker worker_;
};
The worker
object here combines a thread and a mutex-protected queue. For the sake of brevity this is just a snippet, you can find the full implementation here if you are interested - GitHub link.
However, there is an important question about Worker: what exactly will we put in the task queue? As you can see, our controller has two "tasks": processInput
and processHangup
. They have different signatures, so they are two different types and cannot be placed into the same container. In the classical approach, we would create a virtual base object and derive two objects from this base, one for each task. This would be quite a lot of boilerplate, and for sure in modern C++ we can do better.
Starting from C++11 we have a type-erased wrapper for any kind of callable object - std::function
. By the way, this is an example of external polymorphism where the polymorphic behavior is the function or functor call itself. Using that we could write something like the following:
// Declaration of the queue inside the Worker
std::queue<std::function<void()>> queue;
// Public method of the Worker to schedule a task
void Worker::addTask(std::function<void()> task) {
// locking the mutex is omitted
queue_.emplace(std::move(task));
}
// And then in the implementation of the VoiceMenuHandler -
void VoiceMenuHandler::receiveInput(const MenuInput& data) {
// Capture the handler instance and the arguments in the lambda,
// and pass it wrapped into a std::function
worker_.addTask( [this, data]() { processInput(data); } );
}
// And the same for the Hangup signal
void VoiceMenuHandler::receiveHangup(const HangUp& data) {
// This lambda has the same signature, hence it will fit perfectly
// into the queue.
worker_.addTask([this, data]() { processHangup(data); });
}
This will solve our problem, but std::function
is the least efficient way to implement the solution. To support all possible types and sizes of callable objects, std::function
must perform heap allocation on creation, and allocation is slow. I will show the numbers a little later, but we can do two or three times faster with a little effort.
External Polymorphism Implementation
So how to write your own type erased wrapper? Let’s start with a naive implementation without any type erasure.
template<typename Handler, typename Data>
class SimpleTask {
public:
SimpleTask(Handler* handler, Data data)
: handler_{ handler }, data_{ data }
{}
void operator()() {
handler_->process(data_);
}
private:
Handler* handler_;
Data data_;
};
This is a very simple template class that contains a pointer to a message handler instance and arguments. If we only had one kind of task, that would be enough, but our VoiceMenuHandler
has two tasks, and instantiating this template for each of them will create two different types that we cannot store in the same queue. So we really need type erasure.
There are two fantastic talks regarding this topic, one by Sean Parent (youtube) and another by Sy Brand (youtube). My implementation is based on the latter but the former has a really good example of the benefits of type erasure and External Polymorphism. Here’s the code.
// First of all - here's the beauty of it. As clean and simple as
// std::function. Without deriving our objects from any base class
// we have full polymorphic behavior when we need it. And when
// we don't need it we don't pay anything.
auto functorWrapper = TaskWrapper{ MyFunctorObject };
auto lambdaWrapper = TaskWrapper{ [](){ std::cout << "lambda!\n"; } };
std::queue<TaskWrapper> queue;
queue.emplace(std::move(functorWrapper));
queue.emplace(std::move(lambdaWrapper));
queue.front()(); // invoke MyFunctorObject.operator()
queue.pop();
// Implementation of the wrapper type.
namespace _detail {
// Definition of self-made virtual table. It holds several
// function pointers.
struct vtable {
// Main part of the logic, this will invoke the stored callable.
void (*run)(void* ptr);
// These are necessary for correct copy and destruction of
// the stored objects.
void (*destroy)(void* ptr);
void (*clone)(void* storage, const void* ptr);
void (*move_clone)(void* storage, void* ptr);
};
// Template variable from C++17. We instantiate vtable,
// initializing the pointers with lambdas that decay to
// function pointers because they have empty capture lists.
template<typename Callable>
constexpr vtable vtable_for{
[](void* ptr) {
// Inside each of the lambdas we restore the type info
// using the information from the template parameters.
static_cast<Callable*>(ptr)->operator()();
},
// Destructor
[](void* ptr) {
std::destroy_at(static_cast<Callable*>(ptr));
},
// Copy constructor
[](void* storage, const void* ptr) {
new (storage) Callable {
*static_cast<const Callable*>(ptr)};
},
// Move constructor
[](void* storage, void* ptr) {
new (storage) Callable {
std::move(*static_cast<Callable*>(ptr))};
}
};
}; // namespace _detail
class TaskWrapper {
public:
TaskWrapper() : vtable_{ nullptr }
{}
// We need all the constructors and assignment operator machinery.
TaskWrapper(const TaskWrapper& other) {
other.vtable_->clone(&buf_, &other.buf_);
vtable_ = other.vtable_;
}
TaskWrapper(TaskWrapper&& other) noexcept {
other.vtable_->move_clone(&buf_, &other.buf_);
vtable_ = other.vtable_;
}
~TaskWrapper() {
if (vtable_) {
vtable_->destroy(&buf_);
}
}
TaskWrapper& operator=(const TaskWrapper& other) {
if (vtable_) {
vtable_->destroy(&buf_);
}
if (other.vtable_) {
other.vtable_->clone(&buf_, &other.buf_);
}
vtable_ = other.vtable_;
return *this;
}
TaskWrapper& operator=(TaskWrapper&& other) noexcept {
if (vtable_) {
vtable_->destroy(&buf_);
}
if (other.vtable_) {
other.vtable_->move_clone(&buf_, &other.buf_);
}
vtable_ = other.vtable_;
return *this;
}
// This is where the magic happens. We create a virtual table
// instance that 'remembers' the type information while the
// callable is stored by placement new in the internal buffer,
// e.g. as a type-less set of bytes.
// Placement new essentially implements small buffer optimization.
// Instead of allocating on the heap, we store data on the stack.
// This is how we gain in performance.
template<typename Callable>
TaskWrapper(Callable c)
: vtable_{ &_detail::vtable_for<Callable> }
{
static_assert(sizeof(Callable) < sizeof(buf_),
"Wrapper buffer is too small.");
new(&buf_) Callable{ std::move(c) };
}
// This is where we invoke the stored callable.
void operator()() {
if (vtable_) {
vtable_->run(&buf_);
}
}
private:
// We use aligned storage to ensure that the wrapped object
// will be properly aligned.
std::aligned_storage_t<64> buf_;
const _detail::vtable* vtable_;
};
This implementation has one limitation, it only supports one specific callable object signature - a void return type, and an empty argument list, this is done for the sake of simplicity. A little more complex configurable implementation can be found here - GitHub link.
You may think that this is a bit too much boilerplate. But this only has to be written once and then you can use it across all your active objects! Implementation of VoiceMenuHandler
is quite simple.
void VoiceMenuHandler::receiveInput(const MenuInput& data) {
// Instead of processing the input synchronously, we schedule it
// for later, and return immediately, not wasting the caller's time.
worker_.addTask(
TaskWrapper{[this, data]() { processInput(data); } });
}
void VoiceMenuHandler::receiveHangup(const HangUp& data) {
worker_.addTask(
TaskWrapper{[this, data]() { processHangup(data); } });
}
std::string VoiceMenuHandler::fetchMenuSectionPrompt(
char digit,
const std::string& callId
) {
// Given the current state and user input, go to the next menu item.
std::cout << "in the call [" << callId << "] menu item '"
<< digit << "' selected.\n";
return callId + "_prompt_" + digit;
}
void VoiceMenuHandler::playVoiceMenuPrompt(
const std::string& callId,
const std::string& prompt
) {
// Command media server to start playing the current menu prompt.
std::cout << "play prompt [" << prompt << "]\n";
}
void VoiceMenuHandler::processInput(const MenuInput& data) {
const auto prompt = fetchMenuSectionPrompt(data.digit, data.callId);
playVoiceMenuPrompt(data.callId, prompt);
}
void VoiceMenuHandler::cleanupCallData(const std::string& callId) {
// Free the resources that we used to serve the call.
std::cout << "call [" << callId << "] ended.\n";
}
void VoiceMenuHandler::processHangup(const HangUp& data) {
cleanupCallData(data.callId);
}
And this is how it can be used:
VoiceMenuHandler menuHandler;
std::thread sender([&menuHandler]() {
menuHandler.receiveInput(MenuInput{ '2', "call_1@ip_addr" });
menuHandler.receiveInput(MenuInput{ '1', "call_2@ip_addr" });
menuHandler.receiveHangup(HangUp{ "call_1@ip_addr" });
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});
sender.join();
The output will be:
in the call [call_1@ip_addr] menu item '2' selected.
play prompt [call_1@ip_addr_prompt_2]
in the call [call_2@ip_addr] menu item '1' selected.
play prompt [call_2@ip_addr_prompt_1]
call [call_1@ip_addr] ended.
Was It Worth The Effort?
Let’s see if our implementation is faster than std::function
. I composed a little benchmark, in which I measured the time it took to create the wrapper, put it into a queue, retrieve it and invoke the stored callable. The computation in the callable is negligible, so the extra cost of the wrapper itself is measured.
As you can see, our original naive implementation is by far the fastest. So if a simple solution is enough for you, by all means go for it. Our wrapper adds some overhead for indirect calls, but it still runs twice as fast as std::function
because it does not do any heap allocations. The last column shows the performance of coroutines which we are about to discuss.
Is Active Object Asynchronous Enough?
Let’s take a closer look at this method:
void VoiceMenuHandler::processInput(const MenuInput& data) {
const auto prompt = fetchMenuSectionPrompt(data.digit, data.callId);
playVoiceMenuPrompt(data.callId, prompt);
}
fetchMenuSectionPrompt
sounds like a potential call to a remote service doesn’t it? Since we only have one worker thread, in case this call takes a long time, the whole queue will get stuck. It would be great to make this call asynchronous. One way to do it is to split this method into two. One will send a request, and when we receive a response, we’ll queue up another task to play the prompt. This approach will work, but the business logic will be scattered across two different functions, making it harder to read. And what if we have not two, but five or ten steps? Must we divide the logic into ten parts? Before C++20, we had no choice, but now we can implement asynchronous calls using coroutines. At first glance, coroutines can seem a bit intimidating due to the sheer number of customization points they provide, but actually implementing a coroutine is surprisingly easy. By the way, if you want to study the theory, there is a good talk by Andreas Fertig (youtube) and also the documentation at cppreference.com is really good.
First Step: Transform a Function into a Coroutine
In order to become a coroutine a function needs two things: use one of the co_await
, co_yield
or co_return
keywords and return a handle to the coroutine (or, in case of the Microsoft compiler, a wrapper object containing the handle). Here’s the code for the return type we need.
struct CoroutineTask {
// 'promise_type' is the mandatory element of the coroutine
// return value type.
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
struct promise_type {
CoroutineTask get_return_object() {
return { handle_type::from_promise(*this) };
}
// We want our coroutine to be suspended on creation and
// resume it later in the worker thread.
std::suspend_always initial_suspend() noexcept { return {}; }
// Don't suspend the coroutine when it reaches co_return.
std::suspend_never final_suspend() noexcept { return {}; }
// Our coroutine does not return any values, so its
// promise_type defines the 'return_void' method, otherwise
// definition of 'return_value' is necessary.
void return_void() {}
void unhandled_exception() {}
};
// This handle will be used in the worker thread to resume
// the coroutine.
handle_type h_;
};
You can find all the details about the lifetime of the object above at cppreference.com
When we have the right return type, only a slight modification will make our method into a coroutine. Note the return type and co_return
keyword.
CoroutineTask VoiceMenuHandlerCoroutines::processInput(
const MenuInput data
) {
const auto prompt = fetchMenuSectionPrompt(data.digit, data.callId);
playVoiceMenuPrompt(data.callId, prompt);
co_return;
// Coroutine state will be destroyed after this point.
}
// The public interface of VoiceMenuHandler has to change a little.
void VoiceMenuHandlerCoroutines::receiveInput(const MenuInput& data) {
// Since the CoroutineTask::promise_type defines its
// 'initial_suspend' method to return 'std::suspend_always'
// the coroutine will be in suspended state after its creation,
// so its body only runs when it is explicitly resumed.
// Resume will happen in worker task.
worker_.addTask( processInput(data).h_ );
}
void VoiceMenuHandlerCoroutines::receiveHangup(const HangUp& data) {
// Note that we use the same queue and worker thread to run normal
// tasks and coroutines.
worker_.addTask(
TaskWrapper{ [this, data]() { processHangup(data); } });
}
I want to draw your attention to the fact that the second receiveHangup method has remained unchanged, and we schedule old tasks together with new coroutines using the same task wrapper, which knows nothing about the coroutine_handle type. The coroutine handle doesn't know anything about our wrapper either, and yet they work well together. This is the power of external polymorphism! Full implementation of new VoiceMenuHandler
can be found here - GitHub link.
Second Step: Implement an Async Operation
Transformation into a coroutine does not solve our problem by itself. We need to write the co_await
keyword in front of the fetchMenuSectionPrompt
call. But in order for this to work, fetchMenuSectionPrompt
must first be modified to return an awaiter object that looks like following
// In order to be compatible with co_await a function must return an
// awaiter, that implements three special methods.
struct AwaitablePrompt {
std::string callId;
char digit;
// External object that does the actual I/O
PromptFetcher& fetcher_;
std::string prompt_;
// The special methods -
bool await_ready();
void await_suspend(std::coroutine_handle<> h);
std::string await_resume();
};
// This will be called first when the compiler sees co_await expression.
bool AwaitablePrompt::await_ready() {
// If PromptFetcher had a cache, we could check it here and
// skip suspension of the coroutine by returning 'true'.
return false;
}
// This will be called second.
void VoiceMenuHandlerAsync::AwaitablePrompt::await_suspend(
std::coroutine_handle<> h
) {
// When this method is called the coroutine is already suspended,
// so we can pass its handle elsewhere.
fetcher_.fetch(
callId,
digit,
[this, h](
const std::string& prompt,
PromptFetcher::worker_type& worker
) {
// co_await resumes execution right before 'await_resume' call
// so it is safe to assign to prompt here because awaiter
// lifetime lasts until after 'await_resume' return.
prompt_ = prompt;
// Worker to schedule the coroutine on will be provided externally.
// Alternatively we could pass coroutine_handle to the fetcher
// and then the fetcher would be responsible for scheduling it.
worker.addTask(h);
// from this point awaiter may be deleted at any moment so we
// should not touch it (this* pointer) anymore. So assigning to
// the prompt_ here would be potential undefined behavior.
}
);
}
std::string AwaitablePrompt::await_resume() {
// This will become the result of co_await expression.
return prompt_;
}
And this is how we need to change fetchMenuSectionPrompt
, note its return type.
AwaitablePrompt VoiceMenuHandlerAsync::fetchMenuSectionPrompt(
char digit,
const std::string& callId
) {
std::cout << "!Coroutine! - in call [" << callId
<< "] menu item '" << digit << "' selected.\n";
return AwaitablePrompt{ callId, digit, fetcher_ };
}
If that looks a little confusing please check the full implementation here - GitHub link.
Finally, we can make our problematic method truly asynchronous.
CoroutineTask VoiceMenuHandlerAsync::processInput(const MenuInput data) {
// co_await is compiled into the following sequence:
// if(!AwaitablePrompt::await_ready()) {
// AwaitablePrompt::await_suspend(current_coro_handle);
// // depending on what await_suspend returns continuation
// // may be different.
// }
// AwaitablePrompt::await_resume()
// whatever await_resume returns becomes the result of co_await
const auto prompt = co_await fetchMenuSectionPrompt(
data.digit, data.callId);
playVoiceMenuPrompt(data.callId, prompt);
co_return;
// Coroutine state will be destroyed after this point.
}
So now, as soon as fetchMenuSectionPrompt
is called this coroutine will be suspended and the worker thread will pick up the next item from the queue. And when we receive the response, we can schedule the processInput
coroutine to be resumed on any thread! In my implementation, fetcher does it just to demonstrate the concept. See the code - GitHub link.
Testing the async version:
PromptFetcher fetcher;
VoiceMenuHandlerAsync menuHandlerAsync{ fetcher };
std::thread senderToAsync([&menuHandlerAsync, &fetcher]() {
menuHandlerAsync.receiveInput(
MenuInput{ '7', "call_async_9@ip_addr" });
menuHandlerAsync.receiveInput(
MenuInput{ '8', "call_async_8@ip_addr" });
// "Receive" a response and play the prompt on the fetcher thread.
fetcher.processResponse("call_async_8@ip_addr", "prompt_AAA");
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// "Receive" a response and play the prompt on the fetcher thread.
fetcher.processResponse("call_async_9@ip_addr", "prompt_BBB");
menuHandlerAsync.receiveHangup(HangUp{ "call_async_8@ip_addr" });
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});
senderToAsync.join();
Output will look like this:
!Coroutine! - in the call [call_async_9@ip_addr] menu item '7' selected.
fetch request sent callId [call_async_9@ip_addr], input = 7.
received response for [call_async_9@ip_addr]
play prompt [prompt_BBB]
!Coroutine! - in the call [call_async_8@ip_addr] menu item '8' selected.
fetch request sent callId [call_async_8@ip_addr], input = 8.
We can see that both requests were sent before we received any responses, and that the response to the second request came first, yet everything worked without any issues. So C++20’s coroutines provide really nice features and they are still faster than std::function
:)
All right, that was a lot of information to digest. Thank you for your attention, and I hope you find this useful. You can find all the code from this article here - GitHub link.
Posted on January 9, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
January 9, 2023