Asynchronous state machine with Symfony Workflows
Danil Khaliullin
Posted on February 11, 2024
A finite-state machine is an abstract machine that can be in exactly one of states. The Symfony Workflow component enables the creation and management of state machines. Let’s explore how it can prove extremely beneficial in constructing complex business logic.
Use-case
Let’s imagine the workflow: we initiate the creation of an order, sent it to the order service, send it to the user’s email, and mark it as “sent” in the database.
All these actions should be executed successfully, but there are potential points of failure, such as invalid order data during creation, failures in the order service, or issues with the vendor’s email provider.
Symfony workflow with retry logic allows to execute the complex business flow even if there are any errors during execution. Let’s examine main points that make the workflow fault tolerant and reliable:
- any business logic flow is divided into state-machine transitions. Every transition is executed transactionally.
- if the execution of a transition fails, we can retry it later by command or asynchronously.
- every transition contains only business logic and doesn’t depend on the workflow implementation.
Let’s code!
At first, we need to create a new Symfony workflow according to our workflow schema:
framework:
workflows:
order_send:
type: state_machine
supports:
- App\Entity\WorkflowEntry
marking_store:
type: 'method'
property: 'currentState'
places:
- initialised
- verified
- approved
- sent_to_email
- marked_as_sent
transitions:
verify_order:
from: initialised
to: verified
approve_order:
from: verified
to: approved
send_order_to_email:
from: approved
to: sent_to_email
mark_order_as_sent:
from: sent_to_email
to: marked_as_sent
So, what is the App\Entity\WorkflowEntry
? This is an entity that contains all information about current workflow. Also, the App\Entity\WorkflowEntry
keeps current and next states and stores business logic data:
<?php
declare(strict_types=1);
namespace App\Entity;
...
#[ORM\Entity(repositoryClass: WorkflowEntryRepository::class)]
class WorkflowEntry implements WorkflowInterface
{
#[ORM\Id]
#[ORM\Column(type: "uuid", unique: true)]
#[ORM\GeneratedValue(strategy: "CUSTOM")]
#[ORM\CustomIdGenerator(class: UuidGenerator::class)]
private Uuid $id;
#[ORM\Column(name: "current_state", type: "string")]
private string $currentState = 'initialised';
#[ORM\Column(name: "workflow_type", length: 32, enumType: WorkflowType::class, options: ["default" => "default"])]
private WorkflowType $workflowType = WorkflowType::DefaultType;
#[ORM\Column(name: "next_transition", type: "string", nullable: true)]
private ?string $nextTransition = null;
#[ORM\Column(type: "json")]
private array $stamps = [];
#[ORM\Column(enumType: WorkflowStatus::class, options: ["default" => "started"])]
private WorkflowStatus $status = WorkflowStatus::Started;
#[ORM\Column(type: "smallint")]
private int $retries = 0;
#[ORM\Column(name: "created_at", type: "datetime_immutable")]
private \DateTimeImmutable $createdAt;
#[ORM\Column(name: "updated_at", type: "datetime_immutable")]
private \DateTimeImmutable $updatedAt;
public function __construct()
{
$this->createdAt = new \DateTimeImmutable();
$this->updatedAt = new \DateTimeImmutable();
}
public static function create(
WorkflowType $type,
string $nextTransition,
array $stamps,
): WorkflowEntry {
$entry = new WorkflowEntry();
$entry->setWorkflowType($type);
$entry->setNextTransition($nextTransition);
$entry->setStamps($stamps);
return $entry;
}
...
}
Business logic data might be stored in an “envelope” using stamps App\Service\Workflow\WorkflowStampInterface
. For instance, we can store the order id in a stamp:
class OrderIdStamp implements WorkflowStampInterface
{
private Uuid $orderId;
public function getOrderId(): Uuid
{
return $this->orderId;
}
public function setOrderId(Uuid $orderId): void
{
$this->orderId = $orderId;
}
public static function createWithOrderId(Uuid $orderId): OrderIdStamp
{
$stamp = new OrderIdStamp();
$stamp->setOrderId($orderId);
return $stamp;
}
}
Stamps are serialized in the Envelope App\Service\Workflow\Envelope\WorkflowEnvelope
. The App\Entity\WorkflowEntry
entity is stored in the database after every transition to allow resuming the process after failures.
Let’s see how this approach improves the workflow:
- We store data as the envelope in the WorkflowEntry that goes through all transitions.
- Every transition is executed transactionally.
- After the transition is done, we keep its result and any additional data in the envelope.
- If one of the transitions fails, we have the possibility to retry it in case of a temporary failure or totally fail the whole workflow.
<?php
declare(strict_types=1);
namespace App\Service\Workflow\Envelope;
use App\Service\Workflow\WorkflowStampInterface;
class WorkflowEnvelope
{
private array $stamps;
/**
* @param WorkflowStampInterface[] $stamps
*/
public function __construct(array $stamps = [])
{
foreach ($stamps as $stamp) {
$this->addStamp($stamp);
}
}
public function addStamp(WorkflowStampInterface $stamp): void
{
$this->stamps[$stamp::class][] = $stamp;
}
/**
* @return WorkflowStampInterface[]
*/
public function getStamps(): array
{
return $this->stamps;
}
public function getStamp(string $stampFqcn): WorkflowStampInterface
{
$stamps = $this->stamps[$stampFqcn] ?? [];
if (count($stamps) === 0) {
throw new \RuntimeException(sprintf('Stamp with type %s is not found', $stampFqcn));
}
return reset($stamps);
}
public function hasStamp(string $stampFqcn): bool
{
return isset($this->stamps[$stampFqcn]);
}
}
How to manage it with Symfony Workflow?
Let’s use events to manage it.
Firstly, we separate every transition into a single class to decouple the business logic and follow the Single Responsibility Principle (SRP). For example:
class VerifyOrder implements WorkflowTransitionInterface
{
public function __construct(
private readonly OrderRepository $orderRepository,
) {
}
public function handle(WorkflowEnvelope $envelope): WorkflowEnvelope
{
/** @var OrderIdStamp $orderIdStamp */
$orderIdStamp = $envelope->getStamp(OrderIdStamp::class);
$orderId = $orderIdStamp->getOrderId();
$order = $this->orderRepository->find($orderId);
// Here we can make verification actions
return $envelope;
}
public function getNextTransition(): ?string
{
return Transition::ApproveOrder->value;
}
public function getState(): ?string
{
return State::Verified->value;
}
}
To apply a transition and update the current workflow state, we can use the Workflow::apply
method. Let’s create a subscriber and subscribe to our custom WorkflowNextStateEvent
to apply the next transition:
class WorkflowNextStateSubscriber implements EventSubscriberInterface
{
public function __construct(
private readonly ServiceLocator $workflows,
) {
}
public static function getSubscribedEvents(): array
{
return [
WorkflowNextStateEvent::class => 'applyNextState',
];
}
public function applyNextState(WorkflowNextStateEvent $event): void
{
$workflowEntry = $event->getWorkflowEntry();
if (!$this->workflows->has(
$workflowEntry->getWorkflowType()->value)
) {
throw new \RuntimeException(
sprintf(
'There is no workflow with type %s',
$workflowEntry->getWorkflowType()->value
)
);
}
$workflow = $this->workflows->get(
$workflowEntry->getWorkflowType()->value
);
$workflow->apply(
$workflowEntry,
$workflowEntry->getNextTransition()
);
}
}
It gets the needed workflow by type and applies the next transition. To handle every transition transactionally, we can subscribe to workflow events and wrap each transition handle method in another subscriber:
class WorkflowTransitionSubscriber implements EventSubscriberInterface
{
public function __construct(
private readonly EntityManagerInterface $entityManager,
private readonly EventDispatcherInterface $eventDispatcher,
private readonly ServiceLocator $transitions,
private readonly NormalizerInterface $normalizer,
private readonly DenormalizerInterface $denormalizer,
) {
}
public static function getSubscribedEvents(): array
{
return [
'workflow.transition' => 'handleTransition',
];
}
public function handleTransition(Event $event): void
{
/** @var WorkflowEntry $workflowEntry */
$workflowEntry = $event->getSubject();
$this->entityManager->getConnection()->beginTransaction();
try {
$transitionKey = sprintf(
'%s.%s',
$workflowEntry->getWorkflowType()->value,
$workflowEntry->getNextTransition(),
);
/** @var WorkflowTransitionInterface $transition */
$transition = $this->transitions->get($transitionKey);
$envelope = $this->denormalizer->denormalize($workflowEntry->getStamps(), WorkflowEnvelope::class);
$envelope = $transition->handle($envelope);
/** @var array $stamps */
$stamps = $this->normalizer->normalize($envelope, 'array');
$workflowEntry->setStamps($stamps);
$workflowEntry->setCurrentState($transition->getState());
$workflowEntry->setNextTransition($transition->getNextTransition());
if ($workflowEntry->getNextTransition() === null) {
$workflowEntry->setStatus(WorkflowStatus::Finished);
}
$this->entityManager->persist($workflowEntry);
$this->entityManager->flush();
$this->entityManager->getConnection()->commit();
} catch (\Throwable $exception) {
$this->entityManager->getConnection()->rollBack();
throw $exception;
}
if ($workflowEntry->getNextTransition() !== null) {
$this->eventDispatcher->dispatch(new WorkflowNextStateEvent($workflowEntry));
}
}
}
Let’s go point by point:
- to start the workflow, we dispatch the
WorkflowNextStateEvent
with the created WorkflowEntry object, which contains “stamps” — our order data. -
TheWorkflowNextStateSubscriber
handles the event, defines the workflow that should be applied and call theWorkflow::apply
method. -
TheWorkflowTransitionSubscriber
subscribes to theworkflow.transition
event, which is dispatched when the WorkflowEntry is going through this transition. WorkflowTransitionSubscriber begins the transaction, prepares the envelope with stamps, handles theWorkflowTransitionInterface::handle
method, and commits or rolls back the transaction. - After that, the
WorkflowTransitionSubscriber
dispatches theWorkflowNextStateEvent
event to apply the next transition until the workflow is done.
What about failures?
Since we store the result of every transition, it’s easy to continue and finish the workflow from any state. Consider the case when the email service fails:
In this case, we can handle the service exception and mark the workflow as failed. After that, we can retry the workflow using a cron job or even send it to a queue to finish it asynchronously. If the error is permanent or we exceed the retries count, we can totally stop the workflow.
<?php
class WorkflowHandler
{
public function __construct(
private readonly EventDispatcherInterface $eventDispatcher,
private readonly LoggerInterface $logger,
private readonly EntityManagerInterface $entityManager,
private readonly NormalizerInterface $normalizer,
private readonly DenormalizerInterface $denormalizer,
private readonly MessageBusInterface $bus,
) {
}
public function handle(WorkflowEntry $workflowEntry): void
{
try {
$this->eventDispatcher->dispatch(new WorkflowNextStateEvent($workflowEntry));
} catch (StopWorkflowException $exception) {
$this->logger->error(
sprintf(
'An permanent internal error occurred during handling workflow "%s". Workflow state: %s. The workflow stopped.',
$workflowEntry->getWorkflowType()->value,
$workflowEntry->getCurrentState(),
),
[
$exception
]
);
$workflowEntry->setStatus(WorkflowStatus::Stopped);
$this->entityManager->persist($workflowEntry);
$this->entityManager->flush();
} catch (WorkflowInternalErrorException | \Throwable $exception) {
$this->logger->error(
sprintf(
'An internal error occurred during handling workflow "%s". Workflow state: %s',
$workflowEntry->getWorkflowType()->value,
$workflowEntry->getCurrentState(),
),
[
$exception
]
);
$workflowEntry->setStatus(WorkflowStatus::Failed);
/** @var WorkflowEnvelope $envelope */
$envelope = $this->denormalizer->denormalize($workflowEntry->getStamps(), WorkflowEnvelope::class);
$envelope->addStamp(new WorkflowInternalErrorStamp(
$exception->getMessage(),
));
/** @var array<WorkflowStampInterface> $stamps */
$stamps = $this->normalizer->normalize($envelope, 'array');
$workflowEntry->setStamps($stamps);
$this->entityManager->persist($workflowEntry);
$this->entityManager->flush();
}
}
public function retry(WorkflowEntry $workflowEntry): void
{
$workflowEntry->addRetry();
$workflowEntry->setStatus(WorkflowStatus::Started);
$this->entityManager->persist($workflowEntry);
$this->entityManager->flush();
$this->handle($workflowEntry);
}
}
Conclusion
The finite-state machine is a quite good instrument to manage complex business case logic, splitting its parts into logical steps, and handling failures, allowing the construction of fault-tolerant systems. The Symfony Workflow helps achieve this.
I hope you found this article helpful and that it provided some insights into working with Symfony Worklows.
You can check out the whole project in my Github: https://github.com/bifidokk/symfony-asynchronous-workflows
Feel free to leave your feedback or questions in the comments section below.
Happy coding! 😌
Posted on February 11, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.