SObjectizer Tales - 4. Handling commands
Marco Arena
Posted on November 2, 2023
Hello SObjectizer aficionados, here we are for a new adventure today!
We have just met Lucas, a colleague who needs to capture specific moments with his camera. What if we could seamlessly command the device to start and stop its acquisition with the wave of our virtual wand?
Lucas provides an example of usage:
- I run the program and it does nothing;
- at some point, I decide to start the webcam when I press a button or shout “Alexa, start the webcam”. I expect frames start flowing at that moment;
- I make a funny face for some time, then I decide to stop somehow and the program should get back to idle state;
- maybe I want to do it again, and again, and again…
In other words, we are asked to provide a way to control the acquisition.
This feature is indispensable for applications relying on camera functionality. Consider, for instance, the scenario of monitoring an automobile assembly line. Continuously observing the line during production pauses is probably useless. It’s far more practical to record the actual manufacturing process or specific segments of it. In this context, the assembly line can transmit a command to our program when production commences and another command when it concludes.
Before we proceed further, it’s worth noting an alternative way exists: hardware triggers. In this setup, the camera is configured to initiate the acquisition process exclusively upon receiving a physical trigger, such as a GPIO signal. As a result, GetNextImage-like functions will pause until the camera detects the trigger. This approach is frequently favored in situations where precise timing is of utmost importance.
Nevertheless, extending support for virtual triggers, also known as software triggers, holds significant importance as it allows various entities, potentially from remote locations, to transmit commands in a “soft” real-time manner.
Since we met signals in the previous installment, we model these commands as two signal_t
:
struct start_acquisition_command final : so_5::signal_t {};
struct stop_acquisition_command final : so_5:: signal_t {};
In the remaining sections of this article, we’ll explore how to incorporate this feature into the three existing versions of image producers, with the exception of the virtual one, which we’ll cover in our upcoming post. As a result, this post may run a bit lengthy, but I appreciate your patience and hope you find it valuable!
Approaching callback-based devices
Let’s start accommodating image_producer_callback
that interacts with our fake callback-based device observable_videocapture
.
First of all, it’s reasonable to do a small refactoring in observable_videocapture
: we change on_next_image(stop_token, lambda)
to start(lambda)
and add a function stop()
. This is actually a bit more consistent with real camera SDKs and makes the code a bit cleaner:
class observable_videocapture
{
public:
observable_videocapture()
: m_capture(0, cv::CAP_DSHOW)
{
}
void start(auto on_next)
{
if (!m_capture.isOpened())
{
throw std::runtime_error("Can't connect to the webcam");
}
m_worker = std::jthread{ [this, f = std::move(on_next)](std::stop_token st) {
cv::Mat image;
while (!st.stop_requested())
{
m_capture >> image;
f(std::move(image));
}
} };
}
void stop()
{
m_worker.request_stop();
}
private:
cv::VideoCapture m_capture;
std::jthread m_worker;
};
Remember that observable_videocapture
is just a fake device we make use of just for showing how to interact with a callback-based device class. The point is that some real SDKs provide start and stop functions (e.g. BeginAcquisition
, EndAcquisition
) thus adding start
and stop
to observable_videocapture
is a small step forward. We have just taken a shortcut by passing the lambda to start()
directly, whereas this is usually set in advance (and only once) through another function or SDK facility.
At this point, we change image_producer_callback
simply by reacting to the acquisition signals:
class image_producer_callback final : public so_5::agent_t
{
public:
image_producer_callback(so_5::agent_context_t ctx, so_5::mbox_t channel)
: agent_t(std::move(ctx)), m_channel(std::move(channel))
{
}
void so_define_agent() override
{
so_subscribe(???).event([this](so_5::mhood_t<start_acquisition_command>) {
m_device.start([this](cv::Mat image) {
so_5::send<cv::Mat>(m_channel, std::move(image));
});
}).event([this](so_5::mhood_t<stop_acquisition_command>) {
m_device.stop();
});
}
private:
so_5::mbox_t m_channel;
observable_videocapture m_device;
};
As usual, we can assume m_capture >> image;
never hangs. Consider also that calling start()
(or stop()
) twice is not a problem here. In real life, this usually gives an error and must be handled.
Now we need to make a decision: where to receive commands?
We have two options:
- sending the commands directly to the agent (to its own message box), or
- sending the commands to a dedicated channel.
Both involve passing through a message box, however the latter is a bit more flexible and allows getting the “command channel” by name more easily from other agents (and other parts of the program). Deal, we go for a dedicated message box:
class image_producer_callback final : public so_5::agent_t
{
public:
image_producer_callback(so_5::agent_context_t ctx, so_5::mbox_t channel, so_5::mbox_t commands)
: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_commands(std::move(commands))
{
}
void so_define_agent() override
{
so_subscribe(m_commands).event([this](so_5::mhood_t<start_acquisition_command>) {
m_device.start([this](cv::Mat image) {
so_5::send<cv::Mat>(m_channel, std::move(image));
});
}).event([this](so_5::mhood_t<stop_acquisition_command>) {
m_device.stop();
});
}
private:
so_5::mbox_t m_channel;
so_5::mbox_t m_commands;
observable_videocapture m_device;
};
We make this command channel the same way as we did for the main image channel:
int main()
{
auto ctrl_c = get_ctrl_c_token();
const so_5::wrapped_env_t sobjectizer;
auto main_channel = sobjectizer.environment().create_mbox("main");
auto commands_channel = sobjectizer.environment().create_mbox("commands");
auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
c.make_agent<image_producer_callback>(main_channel, commands_channel);
c.make_agent<image_viewer>(main_channel);
}
wait_for_stop(ctrl_c);
}
At this point we can send signals to commands_channel
but we’ll get to that in a moment.
Approaching polling-based devices #1
Now we accommodate image_producer_recursive
. Let’s recall the implementation from the previous post:
class image_producer_recursive final : public so_5::agent_t
{
struct grab_image final : so_5::signal_t{};
public:
image_producer_recursive(so_5::agent_context_t ctx, so_5::mbox_t channel)
: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_capture(0, cv::CAP_DSHOW)
{
}
void so_define_agent() override
{
so_subscribe_self().event([this](so_5::mhood_t<grab_image>) {
cv::Mat image;
m_capture >> image;
so_5::send<cv::Mat>(m_channel, std::move(image));
so_5::send<grab_image>(*this);
});
}
void so_evt_start() override
{
if (!m_capture.isOpened())
{
throw std::runtime_error("Can't connect to the webcam");
}
so_5::send<grab_image>(*this);
}
private:
so_5::mbox_t m_channel;
cv::VideoCapture m_capture;
};
Intuitively, so_5::send<grab_image>(*this);
should be moved into a message handler :
void so_define_agent() override
{
so_subscribe(m_commands).event(so_5::mhood_t<start_acquisition_command>){
so_5::send<grab_image>(*this);
});
so_subscribe_self().event([this](so_5::mhood_t<grab_image>) {
cv::Mat image;
m_capture >> image;
so_5::send<cv::Mat>(m_channel, std::move(image));
so_5::send<grab_image>(*this); // here
});
}
void so_evt_start() override
{
if (!m_capture.isOpened())
{
throw std::runtime_error("Can't connect to the webcam");
}
// grab_image not sent automatically anymore
}
The big question is: how to handle stop_acquisition_command
? Think about this for a bit: the agent is committed to grabbing images. This means that it sends grab_image
to itself after handling every new image. At some point, it receives stop_acquisition_command
. How can it stop sending grab_image
to itself? A naive solution consists in introducing a flag, something like bool stop_arrived
or such, that gets true when stop_acquisition_command
arrives. Inside grab_image
handler, if that flag is toggled then the agent does not send grab_image
again.
We can do better than this.
An intermediate solution consists in unsubscribing from grab_image
when stop_acquisition_command
arrives. Consequently, it’s required to subscribe (again) on start_acquisition_commands
:
class image_producer_recursive final : public so_5::agent_t
{
struct grab_image final : so_5::signal_t{};
public:
image_producer_recursive(so_5::agent_context_t ctx, so_5::mbox_t channel, so_5::mbox_t commands)
: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_commands(std::move(commands)), m_capture(0)
{
}
void so_define_agent() override
{
so_subscribe(m_commands).event([this](so_5::mhood_t<start_acquisition_command>) {
subscribe_to_grab_image(); // subscribe (again)
so_5::send<grab_image>(*this);
}).event([this](so_5::mhood_t<stop_acquisition_command>) {
so_drop_subscription<grab_image>(so_direct_mbox()); // unsubscribe
});
}
void so_evt_start() override
{
if (!m_capture.isOpened())
{
throw std::runtime_error("Can't connect to the webcam");
}
}
private:
void subscribe_to_grab_image()
{
so_subscribe_self().event([this](so_5::mhood_t<grab_image>) {
cv::Mat image;
m_capture >> image;
so_5::send<cv::Mat>(m_channel, std::move(image));
so_5::send<grab_image>(*this);
});
}
so_5::mbox_t m_channel;
so_5::mbox_t m_commands;
cv::VideoCapture m_capture;
};
so_drop_subscription
is an overloaded function that removes a previously made subscription (not throwing if that does not exist). As you expect, so_drop_subscription<grab_image>(so_direct_mbox());
drops the subscription to the agent’s message box for the message (or signal) type grab_image
.
This solution smells a bit and it also contains a bug maybe you have spotted already.
What if start_acquisition_command
arrives twice? Well, image_producer
throws from the message handler because it tries subscribing to the same signal again!
We might try to add some patches, however there is a more expressive way to deal with this situation that also fixes the bug. Essentially, we should express more clearly what the agent is interested to handle in each of its states. Think of a two-state scenario:
- when the agent is “stopped”, it’s only interested in
start_acquisition_command
that turns the state to “started”; - when the agent is “started”, it’s interested in both
stop_acquisition_command
that turns the state to “stopped”, andgrab_image
that does not change the state but only progresses the acquisition.
If, for instance, start_acquisition_command
arrives when the agent is “started”, it will be automatically ignored because the agent is not interested in that signal at that point.
This leads to one of the strongest concepts of SObjectizer: agent states.
Agent is a finite-state machine. The behavior of an agent depends on the current state of the agent and the received message. An agent can receive and process different messages in each state. In other words, an agent can receive a message in one state but ignore it in another state. Or, if an agent receives the same message in several states, it can handle the message differently in each state.
An important clarification here: in SObjectizer messages which are not handled in the current state are just discarded silently. This behavior might differ from other frameworks. For example, in Akka unhandled messages are automatically sent onto the system eventStream
in the form of UnhandledMessage
.
By default, an agent is in a default state. All the agents we have seen so far work in that default state. Often this is enough.
Now, let’s introduce these two states into image_producer
. In SObjectizer a state is an instance of the class state_t
and is generally defined as a member variable of the agent (st_
is just a style convention):
// ... rest of image_producer
so_5::state_t st_started{this}, st_stopped{this};
so_5::mbox_t m_channel;
so_5::mbox_t m_commands;
cv::VideoCapture m_capture;
};
Optionally, states can have a name:
so_5::state_t st_started{this, "started"}, st_stopped{this, "stopped"};
Some more advanced usages of states will be discussed in a future post, so grasping all the theory is not important now. Let’s put states into practice, instead.
We make message subscriptions and also “activate” the state we want our agent to start with. We do these things in so_define_agent
:
void so_define_agent() override
{
st_started.event([this](so_5::mhood_t<grab_image>) {
// ...
}).event(m_commands, [this](so_5::mhood_t<stop_acquisition_command>) {
// ...
});
st_stopped.event(m_commands, [this](so_5::mhood_t<start_acquisition_command>) {
// ...
});
st_stopped.activate(); // change the state
}
As you can expect, the first subscription is to the agent message box on state st_started
. The second is to m_commands
message box still on state st_started
. The third one is to the agent message box on state st_stopped
. Finally, st_stopped.activate();
changes the current state to st_stopped
.
At this point, let’s fill in the blank:
void so_define_agent() override
{
st_started.event([this](so_5::mhood_t<grab_image>) {
cv::Mat mat;
m_capture >> mat;
so_5::send<cv::Mat>(m_channel, std::move(mat));
so_5::send<grab_image>(*this);
}).event(m_commands, [this](so_5::mhood_t<stop_acquisition_command>) {
st_stopped.activate();
});
st_stopped.event(m_commands, [this](so_5::mhood_t<start_acquisition_command>) {
st_started.activate();
so_5::send<grab_image>(*this);
});
st_stopped.activate();
}
Here is a visual representation of the state transitions:
grab_image
is handled like before. start_acquisition_command
and stop_acquisition_command
are more interesting. Indeed, getting stop_acquisition_command
sets the current state to st_stopped
. On the other hand, getting start_acquisition_command
first changes state to st_started
and then sends grab_image
to itself. This last part merits attention:
st_stopped.event(m_commands, [this](so_5::mhood_t<start_acquisition_command>) {
st_started.activate();
so_5::send<grab_image>(*this);
});
After exiting from this handler, the state of the agent is st_started
. This means, grab_image
will be handled by the corresponding subscription on st_started
. Similarly, if start_acquisition_command
arrives while the agent is in state st_started
, the signal will be just ignored because there is no corresponding handler on that state. Bingo!
Here is the full code:
class image_producer_recursive final : public so_5::agent_t
{
struct grab_image final : so_5::signal_t{};
public:
image_producer_recursive(so_5::agent_context_t ctx, so_5::mbox_t channel, so_5::mbox_t commands)
: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_commands(std::move(commands)), m_capture(0)
{
}
void so_define_agent() override
{
st_started.event([this](so_5::mhood_t<grab_image>) {
cv::Mat mat;
m_capture >> mat;
so_5::send<cv::Mat>(m_channel, std::move(mat));
so_5::send<grab_image>(*this);
}).event(m_commands, [this](so_5::mhood_t<stop_acquisition_command>) {
st_stopped.activate();
});
st_stopped.event(m_commands, [this](so_5::mhood_t<start_acquisition_command>) {
st_started.activate();
so_5::send<grab_image>(*this);
});
st_stopped.activate();
}
void so_evt_start() override
{
if (!m_capture.isOpened())
{
throw std::runtime_error("Can't connect to the webcam");
}
}
private:
so_5::state_t st_stopped{ this };
so_5::state_t st_started{ this };
so_5::mbox_t m_channel;
so_5::mbox_t m_commands;
cv::VideoCapture m_capture;
};
Note that states can be applied to image_producer_callback
in order to avoid calling start
and stop
twice.
Approaching polling-based devices #2
To update image_producer
we start from the solution with the child broker shown in the previous episode. Clearly, there exist multiple ways to accommodate this but one consists in bringing a banal state machine to the party through waiting atomics:
class image_producer final : public so_5::agent_t
{
enum class acquisition_state { st_started, st_stopped, st_cancelled };
class image_producer_broker : public agent_t
{
public:
image_producer_broker(so_5::agent_context_t c, image_producer_with_broker* parent, std::stop_source stop_source, so_5::mbox_t commands)
: agent_t(std::move(c)), m_parent(parent), m_stop_source(std::move(stop_source)), m_commands(std::move(commands))
{
}
void so_define_agent() override
{
so_subscribe(m_commands).event([this](so_5::mhood_t<start_acquisition_command>) {
m_parent->start();
}).event([this](so_5::mhood_t<stop_acquisition_command>) {
m_parent->stop();
});
}
void so_evt_finish() override
{
m_stop_source.request_stop();
}
private:
image_producer_with_broker* m_parent;
std::stop_source m_stop_source;
so_5::mbox_t m_commands;
};
public:
image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, so_5::mbox_t commands)
: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_commands(std::move(commands))
{
}
void so_evt_start() override
{
const auto st = m_stop_source.get_token();
introduce_child_coop(*this, so_5::disp::active_obj::make_dispatcher(so_environment()).binder(), [this](so_5::coop_t& c) {
c.make_agent<image_producer_broker>(this, m_stop_source, m_commands);
});
cv::VideoCapture cap(0, cv::CAP_DSHOW);
if (!cap.isOpened())
{
throw std::runtime_error("Can't connect to the webcam");
}
std::stop_callback sc{ st, [this] {
cancel();
} };
while (!st.stop_requested())
{
m_state.wait(acquisition_state::st_stopped); // wait for NOT st_stopped
cv::Mat image;
while (m_state == acquisition_state::st_started)
{
cap >> image;
so_5::send<cv::Mat>(m_channel, image);
}
}
}
private:
void start()
{
change_state(acquisition_state::st_started);
}
void stop()
{
change_state(acquisition_state::st_stopped);
}
void cancel()
{
change_state(acquisition_state::st_cancelled);
}
void change_state(acquisition_state st)
{
m_state = st;
m_state.notify_one();
}
so_5::mbox_t m_channel;
so_5::mbox_t m_commands;
std::atomic<acquisition_state> m_state = acquisition_state::st_stopped;
std::stop_source m_stop_source;
};
Note that it’s possible to remove stop_source
and stop_token
completely, indeed cancel()
can be called from broker’s so_evt_finish()
. Again, the parent-child relationship guarantees this is called strictly before shutting down the parent agent:
class image_producer final : public so_5::agent_t
{
enum class acquisition_state { st_started, st_stopped, st_cancelled };
class image_producer_broker : public agent_t
{
public:
image_producer_broker(so_5::agent_context_t c, image_producer* parent, so_5::mbox_t commands)
: agent_t(std::move(c)), m_parent(parent), m_commands(std::move(commands))
{
}
void so_define_agent() override
{
so_subscribe(m_commands).event([this](so_5::mhood_t<start_acquisition_command>) {
m_parent->start();
}).event([this](so_5::mhood_t<stop_acquisition_command>) {
m_parent->stop();
});
}
void so_evt_finish() override
{
m_parent->cancel();
}
private:
image_producer* m_parent;
so_5::mbox_t m_commands;
};
public:
image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, so_5::mbox_t commands)
: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_commands(std::move(commands))
{
}
void so_evt_start() override
{
introduce_child_coop(*this, so_5::disp::active_obj::make_dispatcher(so_environment()).binder(), [this](so_5::coop_t& c) {
c.make_agent<image_producer_broker>(this, m_commands);
});
cv::VideoCapture cap(0, cv::CAP_DSHOW);
if (!cap.isOpened())
{
throw std::runtime_error("Can't connect to the webcam");
}
while (m_state != acquisition_state::st_cancelled)
{
m_state.wait(acquisition_state::st_stopped);
cv::Mat image;
while (m_state == acquisition_state::st_started)
{
cap >> image;
so_5::send<cv::Mat>(m_channel, image);
}
}
}
private:
void start()
{
change_state(acquisition_state::st_started);
}
void stop()
{
change_state(acquisition_state::st_stopped);
}
void cancel()
{
change_state(acquisition_state::st_cancelled);
}
void change_state(acquisition_state st)
{
m_state = st;
m_state.notify_one();
}
so_5::mbox_t m_channel;
so_5::mbox_t m_commands;
std::atomic<acquisition_state> m_state = acquisition_state::st_stopped;
};
It was quite a long journey but we made it! From now on, we can pick indifferently any of these below to produce images:
-
image_producer
-
image_producer_callback
-
image_producer_recursive
Also, for more flexibility we passed the commands channel into the producer, however we can even get it from inside any agent with a simple call like this: so_environment().create_mbox("commands")
. However, in this case we prefer injecting the message box into the constructor to avoid depending on a magic string somewhere.
Finally, all the producers can handle commands! For example:
int main()
{
const auto ctrl_c = calico::utils::get_ctrlc_token();
const so_5::wrapped_env_t sobjectizer;
const auto main_channel = sobjectizer.environment().create_mbox("main");
auto commands_channel = sobjectizer.environment().create_mbox("commands");
sobjectizer.environment().introduce_coop(so_5::disp::active_obj::make_dispatcher(sobjectizer.environment()).binder(), [&](so_5::coop_t& c) {
c.make_agent<calico::producers::image_producer_recursive>(main_channel, commands_channel);
c.make_agent<calico::agents::image_viewer>(main_channel);
c.make_agent<calico::agents::image_tracer>(main_channel);
});
std::this_thread::sleep_for(std::chrono::seconds(1));
so_5::send<calico::start_acquisition_command>(commands_channel);
std::cout << "Acquisition START command sent!\n";
std::this_thread::sleep_for(std::chrono::seconds(5));
so_5::send<calico::stop_acquisition_command>(commands_channel);
std::cout << "Acquisition STOP command sent!\n";
calico::utils::wait_for_stop(ctrl_c);
}
We send a start command one second after introducing the main cooperation. This should make the acquisition start on image_producer_callback
.
Here is the final result:
As said before, we assume that cap >> image;
never hangs. This is a fair assumption for OpenCV but in general things might be more complicated. For example, a GetNextImage-like function can actually hang if a hardware trigger is set on the camera. For this reason, SDKs usually provide an additional timeout parameter that can be tuned to an abundant rounding of the frame rate reciprocal (e.g. 100 FPS is 10 ms) or some other meaningful value. However, these issues are often solved by callback-based APIs since you just call “start” and “stop”, and nothing should really hang.
Here is what our data flow looks like now:
The task has been accomplished! In the next post we’ll have some more fun with sending commands.
Takeaway
In this episode we have learned:
- agents can use
so_drop_subscription
to unsubscribe from a message on a particular message box (and state); - agents are state machines;
- by default, an agent is in a default state;
- states are instances of
state_t
, defined as agent’s member variables; - states express intent and SObjectizer enables us to declare transitions succinctly;
- messages that are not handled in the current state are just silently ignored.
As usual, calico
is updated and tagged.
What’s next?
While we’ve acquired the skills to manage commands, there are a few more enhancements required to meet Lucas’ request entirely. We could consider adding support for a straightforward keyboard remote control, or perhaps delve into more sophisticated features, such as automatically triggering a stop command after a 10-second interval…
Stay tuned for our next installment, where we’ll delve deeper into this topic!
Thanks to Yauheni Akhotnikau for having reviewed this post.
Posted on November 2, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.