SObjectizer Tales - 4. Handling commands

ilpropheta

Marco Arena

Posted on November 2, 2023

SObjectizer Tales - 4. Handling commands

Hello SObjectizer aficionados, here we are for a new adventure today!

We have just met Lucas, a colleague who needs to capture specific moments with his camera. What if we could seamlessly command the device to start and stop its acquisition with the wave of our virtual wand?

Lucas provides an example of usage:

  • I run the program and it does nothing;
  • at some point, I decide to start the webcam when I press a button or shout “Alexa, start the webcam”. I expect frames start flowing at that moment;
  • I make a funny face for some time, then I decide to stop somehow and the program should get back to idle state;
  • maybe I want to do it again, and again, and again…

In other words, we are asked to provide a way to control the acquisition.

This feature is indispensable for applications relying on camera functionality. Consider, for instance, the scenario of monitoring an automobile assembly line. Continuously observing the line during production pauses is probably useless. It’s far more practical to record the actual manufacturing process or specific segments of it. In this context, the assembly line can transmit a command to our program when production commences and another command when it concludes.

Before we proceed further, it’s worth noting an alternative way exists: hardware triggers. In this setup, the camera is configured to initiate the acquisition process exclusively upon receiving a physical trigger, such as a GPIO signal. As a result, GetNextImage-like functions will pause until the camera detects the trigger. This approach is frequently favored in situations where precise timing is of utmost importance.

Nevertheless, extending support for virtual triggers, also known as software triggers, holds significant importance as it allows various entities, potentially from remote locations, to transmit commands in a “soft” real-time manner.

Since we met signals in the previous installment, we model these commands as two signal_t:

struct start_acquisition_command final : so_5::signal_t {};
struct stop_acquisition_command final : so_5:: signal_t {};
Enter fullscreen mode Exit fullscreen mode

In the remaining sections of this article, we’ll explore how to incorporate this feature into the three existing versions of image producers, with the exception of the virtual one, which we’ll cover in our upcoming post. As a result, this post may run a bit lengthy, but I appreciate your patience and hope you find it valuable!

Approaching callback-based devices

Let’s start accommodating image_producer_callback that interacts with our fake callback-based device observable_videocapture.

First of all, it’s reasonable to do a small refactoring in observable_videocapture: we change on_next_image(stop_token, lambda) to start(lambda) and add a function stop(). This is actually a bit more consistent with real camera SDKs and makes the code a bit cleaner:

class observable_videocapture
{
public:
    observable_videocapture()
        : m_capture(0, cv::CAP_DSHOW)
    {
    }

    void start(auto on_next)
    {
        if (!m_capture.isOpened())
        {
            throw std::runtime_error("Can't connect to the webcam");
        }

        m_worker = std::jthread{ [this, f = std::move(on_next)](std::stop_token st) {
            cv::Mat image;
            while (!st.stop_requested())
            {
                m_capture >> image;
                f(std::move(image));                
            }
        } };
    }

    void stop()
    {
        m_worker.request_stop();
    }

private:
    cv::VideoCapture m_capture;
    std::jthread m_worker;
};
Enter fullscreen mode Exit fullscreen mode

Remember that observable_videocapture is just a fake device we make use of just for showing how to interact with a callback-based device class. The point is that some real SDKs provide start and stop functions (e.g. BeginAcquisition, EndAcquisition) thus adding start and stop to observable_videocapture is a small step forward. We have just taken a shortcut by passing the lambda to start() directly, whereas this is usually set in advance (and only once) through another function or SDK facility.

At this point, we change image_producer_callback simply by reacting to the acquisition signals:

class image_producer_callback final : public so_5::agent_t
{
public:
    image_producer_callback(so_5::agent_context_t ctx, so_5::mbox_t channel)
        : agent_t(std::move(ctx)), m_channel(std::move(channel))
    {
    }

    void so_define_agent() override
    {
        so_subscribe(???).event([this](so_5::mhood_t<start_acquisition_command>) {
            m_device.start([this](cv::Mat image) {
                so_5::send<cv::Mat>(m_channel, std::move(image));
            });
        }).event([this](so_5::mhood_t<stop_acquisition_command>) {
            m_device.stop();
        });
    }
private:
    so_5::mbox_t m_channel; 
    observable_videocapture m_device;
};
Enter fullscreen mode Exit fullscreen mode

As usual, we can assume m_capture >> image; never hangs. Consider also that calling start() (or stop()) twice is not a problem here. In real life, this usually gives an error and must be handled.

Now we need to make a decision: where to receive commands?

We have two options:

  • sending the commands directly to the agent (to its own message box), or
  • sending the commands to a dedicated channel.

Both involve passing through a message box, however the latter is a bit more flexible and allows getting the “command channel” by name more easily from other agents (and other parts of the program). Deal, we go for a dedicated message box:

class image_producer_callback final : public so_5::agent_t
{
public:
    image_producer_callback(so_5::agent_context_t ctx, so_5::mbox_t channel, so_5::mbox_t commands)
        : agent_t(std::move(ctx)), m_channel(std::move(channel)), m_commands(std::move(commands))
        {
        }

    void so_define_agent() override
    {
        so_subscribe(m_commands).event([this](so_5::mhood_t<start_acquisition_command>) {
            m_device.start([this](cv::Mat image) {
                so_5::send<cv::Mat>(m_channel, std::move(image));
            });
        }).event([this](so_5::mhood_t<stop_acquisition_command>) {
            m_device.stop();
        });
    }
private:
    so_5::mbox_t m_channel;
    so_5::mbox_t m_commands;
    observable_videocapture m_device;
};
Enter fullscreen mode Exit fullscreen mode

We make this command channel the same way as we did for the main image channel:

int main()
{
    auto ctrl_c = get_ctrl_c_token();

    const so_5::wrapped_env_t sobjectizer;
    auto main_channel = sobjectizer.environment().create_mbox("main");
    auto commands_channel = sobjectizer.environment().create_mbox("commands");

    auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
    sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
        c.make_agent<image_producer_callback>(main_channel, commands_channel);
        c.make_agent<image_viewer>(main_channel);       
    }

    wait_for_stop(ctrl_c);
}
Enter fullscreen mode Exit fullscreen mode

At this point we can send signals to commands_channel but we’ll get to that in a moment.

Approaching polling-based devices #1

Now we accommodate image_producer_recursive. Let’s recall the implementation from the previous post:

class image_producer_recursive final : public so_5::agent_t
{
    struct grab_image final : so_5::signal_t{};
public:
    image_producer_recursive(so_5::agent_context_t ctx, so_5::mbox_t channel)
        : agent_t(std::move(ctx)), m_channel(std::move(channel)), m_capture(0, cv::CAP_DSHOW)
    {
    }

    void so_define_agent() override
    {
        so_subscribe_self().event([this](so_5::mhood_t<grab_image>) {
            cv::Mat image;
            m_capture >> image;
            so_5::send<cv::Mat>(m_channel, std::move(image));
            so_5::send<grab_image>(*this);
        });
    }

    void so_evt_start() override
    {
        if (!m_capture.isOpened())
        {
            throw std::runtime_error("Can't connect to the webcam");
        }

        so_5::send<grab_image>(*this);
    }

private:
    so_5::mbox_t m_channel;
    cv::VideoCapture m_capture;
};
Enter fullscreen mode Exit fullscreen mode

Intuitively, so_5::send<grab_image>(*this); should be moved into a message handler :

void so_define_agent() override
{
    so_subscribe(m_commands).event(so_5::mhood_t<start_acquisition_command>){
        so_5::send<grab_image>(*this);
    });

    so_subscribe_self().event([this](so_5::mhood_t<grab_image>) {
        cv::Mat image;
        m_capture >> image;
        so_5::send<cv::Mat>(m_channel, std::move(image));
        so_5::send<grab_image>(*this); // here
    });
}

void so_evt_start() override
{
    if (!m_capture.isOpened())
    {
        throw std::runtime_error("Can't connect to the webcam");
    }

    // grab_image not sent automatically anymore
}
Enter fullscreen mode Exit fullscreen mode

The big question is: how to handle stop_acquisition_command? Think about this for a bit: the agent is committed to grabbing images. This means that it sends grab_image to itself after handling every new image. At some point, it receives stop_acquisition_command. How can it stop sending grab_image to itself? A naive solution consists in introducing a flag, something like bool stop_arrived or such, that gets true when stop_acquisition_command arrives. Inside grab_image handler, if that flag is toggled then the agent does not send grab_image again.

We can do better than this.

An intermediate solution consists in unsubscribing from grab_image when stop_acquisition_command arrives. Consequently, it’s required to subscribe (again) on start_acquisition_commands:

class image_producer_recursive final : public so_5::agent_t
{
    struct grab_image final : so_5::signal_t{};
public:
    image_producer_recursive(so_5::agent_context_t ctx, so_5::mbox_t channel, so_5::mbox_t commands)
        : agent_t(std::move(ctx)), m_channel(std::move(channel)), m_commands(std::move(commands)), m_capture(0)
    {
    }

    void so_define_agent() override
    {
        so_subscribe(m_commands).event([this](so_5::mhood_t<start_acquisition_command>) {
            subscribe_to_grab_image(); // subscribe (again)
            so_5::send<grab_image>(*this);
        }).event([this](so_5::mhood_t<stop_acquisition_command>) {
            so_drop_subscription<grab_image>(so_direct_mbox()); // unsubscribe
        });
    }

    void so_evt_start() override
    {
        if (!m_capture.isOpened())
        {
            throw std::runtime_error("Can't connect to the webcam");
        }
    }

private:
    void subscribe_to_grab_image()
    {
        so_subscribe_self().event([this](so_5::mhood_t<grab_image>) {
            cv::Mat image;
            m_capture >> image;
            so_5::send<cv::Mat>(m_channel, std::move(image));
            so_5::send<grab_image>(*this);
        });
    }

    so_5::mbox_t m_channel;
    so_5::mbox_t m_commands;
    cv::VideoCapture m_capture;
};
Enter fullscreen mode Exit fullscreen mode

so_drop_subscription is an overloaded function that removes a previously made subscription (not throwing if that does not exist). As you expect, so_drop_subscription<grab_image>(so_direct_mbox()); drops the subscription to the agent’s message box for the message (or signal) type grab_image.

This solution smells a bit and it also contains a bug maybe you have spotted already.

What if start_acquisition_command arrives twice? Well, image_producer throws from the message handler because it tries subscribing to the same signal again!

We might try to add some patches, however there is a more expressive way to deal with this situation that also fixes the bug. Essentially, we should express more clearly what the agent is interested to handle in each of its states. Think of a two-state scenario:

  • when the agent is “stopped”, it’s only interested in start_acquisition_command that turns the state to “started”;
  • when the agent is “started”, it’s interested in both stop_acquisition_command that turns the state to “stopped”, and grab_image that does not change the state but only progresses the acquisition.

If, for instance, start_acquisition_command arrives when the agent is “started”, it will be automatically ignored because the agent is not interested in that signal at that point.

This leads to one of the strongest concepts of SObjectizer: agent states.

Agent is a finite-state machine. The behavior of an agent depends on the current state of the agent and the received message. An agent can receive and process different messages in each state. In other words, an agent can receive a message in one state but ignore it in another state. Or, if an agent receives the same message in several states, it can handle the message differently in each state.

An important clarification here:  in SObjectizer messages which are not handled in the current state are just discarded silently. This behavior might differ from other frameworks. For example, in Akka unhandled messages are automatically sent onto the system eventStream in the form of UnhandledMessage.

By default, an agent is in a default state. All the agents we have seen so far work in that default state. Often this is enough.

Now, let’s introduce these two states into image_producer. In SObjectizer a state is an instance of the class state_t and is generally defined as a member variable of the agent (st_ is just a style convention):

    // ... rest of image_producer

    so_5::state_t st_started{this}, st_stopped{this};
    so_5::mbox_t m_channel;
    so_5::mbox_t m_commands;
    cv::VideoCapture m_capture;
};
Enter fullscreen mode Exit fullscreen mode

Optionally, states can have a name:

so_5::state_t st_started{this, "started"}, st_stopped{this, "stopped"};

Some more advanced usages of states will be discussed in a future post, so grasping all the theory is not important now. Let’s put states into practice, instead.

We make message subscriptions and also “activate” the state we want our agent to start with. We do these things in so_define_agent:

void so_define_agent() override
{
    st_started.event([this](so_5::mhood_t<grab_image>) {
        // ...
    }).event(m_commands, [this](so_5::mhood_t<stop_acquisition_command>) {
        // ...
    });
    st_stopped.event(m_commands, [this](so_5::mhood_t<start_acquisition_command>) {
        // ...  
    });

    st_stopped.activate(); // change the state
}
Enter fullscreen mode Exit fullscreen mode

As you can expect, the first subscription is to the agent message box on state st_started. The second is to m_commands message box still on state st_started. The third one is to the agent message box on state st_stopped. Finally, st_stopped.activate(); changes the current state to st_stopped.

At this point, let’s fill in the blank:

void so_define_agent() override
{
    st_started.event([this](so_5::mhood_t<grab_image>) {
        cv::Mat mat;
        m_capture >> mat;
        so_5::send<cv::Mat>(m_channel, std::move(mat));
        so_5::send<grab_image>(*this);
    }).event(m_commands, [this](so_5::mhood_t<stop_acquisition_command>) {
        st_stopped.activate();
    });
    st_stopped.event(m_commands, [this](so_5::mhood_t<start_acquisition_command>) {
        st_started.activate();
        so_5::send<grab_image>(*this);
    });

    st_stopped.activate();
}
Enter fullscreen mode Exit fullscreen mode

Here is a visual representation of the state transitions:

grab_image is handled like before. start_acquisition_command and stop_acquisition_command are more interesting. Indeed, getting stop_acquisition_command sets the current state to st_stopped. On the other hand, getting start_acquisition_command first changes state to st_started and then sends grab_image to itself. This last part merits attention:

st_stopped.event(m_commands, [this](so_5::mhood_t<start_acquisition_command>) {
    st_started.activate();
    so_5::send<grab_image>(*this);
});
Enter fullscreen mode Exit fullscreen mode

After exiting from this handler, the state of the agent is st_started. This means, grab_image will be handled by the corresponding subscription on st_started. Similarly, if start_acquisition_command arrives while the agent is in state st_started, the signal will be just ignored because there is no corresponding handler on that state. Bingo!

Here is the full code:

class image_producer_recursive final : public so_5::agent_t
{
    struct grab_image final : so_5::signal_t{};
public:
    image_producer_recursive(so_5::agent_context_t ctx, so_5::mbox_t channel, so_5::mbox_t commands)
        : agent_t(std::move(ctx)), m_channel(std::move(channel)), m_commands(std::move(commands)), m_capture(0)
    {
    }

    void so_define_agent() override
    {
        st_started.event([this](so_5::mhood_t<grab_image>) {
            cv::Mat mat;
            m_capture >> mat;
            so_5::send<cv::Mat>(m_channel, std::move(mat));
            so_5::send<grab_image>(*this);
        }).event(m_commands, [this](so_5::mhood_t<stop_acquisition_command>) {
            st_stopped.activate();
        });

        st_stopped.event(m_commands, [this](so_5::mhood_t<start_acquisition_command>) {
            st_started.activate();
            so_5::send<grab_image>(*this);
        });

        st_stopped.activate();
    }

    void so_evt_start() override
    {
        if (!m_capture.isOpened())
        {
            throw std::runtime_error("Can't connect to the webcam");
        }
    }

private:
    so_5::state_t st_stopped{ this };
    so_5::state_t st_started{ this };

    so_5::mbox_t m_channel;
    so_5::mbox_t m_commands;
    cv::VideoCapture m_capture;
};
Enter fullscreen mode Exit fullscreen mode

Note that states can be applied to image_producer_callback in order to avoid calling start and stop twice.

Approaching polling-based devices #2

To update image_producer we start from the solution with the child broker shown in the previous episode. Clearly, there exist multiple ways to accommodate this but one consists in bringing a banal state machine to the party through waiting atomics:

class image_producer final : public so_5::agent_t
{
    enum class acquisition_state { st_started, st_stopped, st_cancelled };

    class image_producer_broker : public agent_t
    {
    public:
        image_producer_broker(so_5::agent_context_t c, image_producer_with_broker* parent, std::stop_source stop_source, so_5::mbox_t commands)
            : agent_t(std::move(c)), m_parent(parent), m_stop_source(std::move(stop_source)), m_commands(std::move(commands))
        {
        }

        void so_define_agent() override
        {
            so_subscribe(m_commands).event([this](so_5::mhood_t<start_acquisition_command>) {
                m_parent->start();
                }).event([this](so_5::mhood_t<stop_acquisition_command>) {
                    m_parent->stop();
                });
        }

        void so_evt_finish() override
        {
            m_stop_source.request_stop();
        }

    private:
        image_producer_with_broker* m_parent;
        std::stop_source m_stop_source;
        so_5::mbox_t m_commands;
    };
public:
    image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, so_5::mbox_t commands)
        : agent_t(std::move(ctx)), m_channel(std::move(channel)), m_commands(std::move(commands))
    {
    }

    void so_evt_start() override
    {
        const auto st = m_stop_source.get_token();

        introduce_child_coop(*this, so_5::disp::active_obj::make_dispatcher(so_environment()).binder(), [this](so_5::coop_t& c) {
            c.make_agent<image_producer_broker>(this, m_stop_source, m_commands);
        });

        cv::VideoCapture cap(0, cv::CAP_DSHOW);
        if (!cap.isOpened())
        {
            throw std::runtime_error("Can't connect to the webcam");
        }

        std::stop_callback sc{ st, [this] {
            cancel();
        } };

        while (!st.stop_requested())
        {
            m_state.wait(acquisition_state::st_stopped); // wait for NOT st_stopped
            cv::Mat image;
            while (m_state == acquisition_state::st_started)
            {
                cap >> image;
                so_5::send<cv::Mat>(m_channel, image);
            }
        }
    }

private:
    void start()
    {
        change_state(acquisition_state::st_started);
    }

    void stop()
    {
        change_state(acquisition_state::st_stopped);
    }

    void cancel()
    {
        change_state(acquisition_state::st_cancelled);
    }

    void change_state(acquisition_state st)
    {
        m_state = st;
        m_state.notify_one();
    }

    so_5::mbox_t m_channel;
    so_5::mbox_t m_commands;
    std::atomic<acquisition_state> m_state = acquisition_state::st_stopped;
    std::stop_source m_stop_source;
};
Enter fullscreen mode Exit fullscreen mode

Note that it’s possible to remove stop_source and stop_token completely, indeed cancel() can be called from broker’s so_evt_finish(). Again, the parent-child relationship guarantees this is called strictly before shutting down the parent agent:

class image_producer final : public so_5::agent_t
{
    enum class acquisition_state { st_started, st_stopped, st_cancelled };

    class image_producer_broker : public agent_t
    {
    public:
        image_producer_broker(so_5::agent_context_t c, image_producer* parent, so_5::mbox_t commands)
            : agent_t(std::move(c)), m_parent(parent), m_commands(std::move(commands))
        {
        }

        void so_define_agent() override
        {
            so_subscribe(m_commands).event([this](so_5::mhood_t<start_acquisition_command>) {
                m_parent->start();
            }).event([this](so_5::mhood_t<stop_acquisition_command>) {
                m_parent->stop();
            });
        }

        void so_evt_finish() override
        {
            m_parent->cancel();
        }

    private:
        image_producer* m_parent;
        so_5::mbox_t m_commands;
    };
public:
    image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, so_5::mbox_t commands)
        : agent_t(std::move(ctx)), m_channel(std::move(channel)), m_commands(std::move(commands))
    {
    }

    void so_evt_start() override
    {
        introduce_child_coop(*this, so_5::disp::active_obj::make_dispatcher(so_environment()).binder(), [this](so_5::coop_t& c) {
            c.make_agent<image_producer_broker>(this, m_commands);
        });

        cv::VideoCapture cap(0, cv::CAP_DSHOW);
        if (!cap.isOpened())
        {
            throw std::runtime_error("Can't connect to the webcam");
        }

        while (m_state != acquisition_state::st_cancelled)
        {
            m_state.wait(acquisition_state::st_stopped);
            cv::Mat image;
            while (m_state == acquisition_state::st_started)
            {
                cap >> image;
                so_5::send<cv::Mat>(m_channel, image);
            }
        }
    }

private:
    void start()
    {
        change_state(acquisition_state::st_started);
    }

    void stop()
    {
        change_state(acquisition_state::st_stopped);
    }

    void cancel()
    {
        change_state(acquisition_state::st_cancelled);
    }

    void change_state(acquisition_state st)
    {
        m_state = st;
        m_state.notify_one();
    }

    so_5::mbox_t m_channel;
    so_5::mbox_t m_commands;
    std::atomic<acquisition_state> m_state = acquisition_state::st_stopped;
};
Enter fullscreen mode Exit fullscreen mode

It was quite a long journey but we made it! From now on, we can pick indifferently any of these below to produce images:

  • image_producer
  • image_producer_callback
  • image_producer_recursive

Also, for more flexibility we passed the commands channel into the producer, however we can even get it from inside any agent with a simple call like this: so_environment().create_mbox("commands"). However, in this case we prefer injecting the message box into the constructor to avoid depending on a magic string somewhere.

Finally, all the producers can handle commands! For example:

int main()
{
    const auto ctrl_c = calico::utils::get_ctrlc_token();

    const so_5::wrapped_env_t sobjectizer;
    const auto main_channel = sobjectizer.environment().create_mbox("main");
    auto commands_channel = sobjectizer.environment().create_mbox("commands");

    sobjectizer.environment().introduce_coop(so_5::disp::active_obj::make_dispatcher(sobjectizer.environment()).binder(), [&](so_5::coop_t& c) {
        c.make_agent<calico::producers::image_producer_recursive>(main_channel, commands_channel);
        c.make_agent<calico::agents::image_viewer>(main_channel);
        c.make_agent<calico::agents::image_tracer>(main_channel);
    });

    std::this_thread::sleep_for(std::chrono::seconds(1));
    so_5::send<calico::start_acquisition_command>(commands_channel);

    std::cout << "Acquisition START command sent!\n";

    std::this_thread::sleep_for(std::chrono::seconds(5));
    so_5::send<calico::stop_acquisition_command>(commands_channel);

    std::cout << "Acquisition STOP command sent!\n";

    calico::utils::wait_for_stop(ctrl_c);
}
Enter fullscreen mode Exit fullscreen mode

We send a start command one second after introducing the main cooperation. This should make the acquisition start on image_producer_callback.

Here is the final result:

As said before, we assume that cap >> image; never hangs. This is a fair assumption for OpenCV but in general things might be more complicated. For example, a GetNextImage-like function can actually hang if a hardware trigger is set on the camera. For this reason, SDKs usually provide an additional timeout parameter that can be tuned to an abundant rounding of the frame rate reciprocal (e.g. 100 FPS is 10 ms) or some other meaningful value. However, these issues are often solved by callback-based APIs since you just call “start” and “stop”, and nothing should really hang.

Here is what our data flow looks like now:

The task has been accomplished! In the next post we’ll have some more fun with sending commands.

Takeaway

In this episode we have learned:

  • agents can use so_drop_subscription to unsubscribe from a message on a particular message box (and state);
  • agents are state machines;
  • by default, an agent is in a default state;
  • states are instances of state_t, defined as agent’s member variables;
  • states express intent and SObjectizer enables us to declare transitions succinctly;
  • messages that are not handled in the current state are just silently ignored.

As usual, calico is updated and tagged.

What’s next?

While we’ve acquired the skills to manage commands, there are a few more enhancements required to meet Lucas’ request entirely. We could consider adding support for a straightforward keyboard remote control, or perhaps delve into more sophisticated features, such as automatically triggering a stop command after a 10-second interval…

Stay tuned for our next installment, where we’ll delve deeper into this topic!


Thanks to Yauheni Akhotnikau for having reviewed this post.

💖 💪 🙅 🚩
ilpropheta
Marco Arena

Posted on November 2, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related