In today’s fast-paced and data-driven world, creating complex workflows that can handle thousands of registrations, processes, or actions can be a daunting task. It’s essential to have a solution that is scalable, reliable, and easy to use. Thankfully, there are various tools and engines available in the market, and one of our most preferred instruments is Temporal.io. Temporal provides a set of pre-baked patterns that have been battle-tested in large-scale products and teams, allowing developers to use them with ease and efficiency and focus solely on engineering productivity.
In this post, we’ll check out how Temporal Workflows can be used to develop a framework for efficient document lifecycle management. Our focus will be on using Temporal to handle workflow events related to document changes without burdening the system with unnecessary processes and manual steps. We will provide code samples in PHP, but you can adapt them to any of the languages Temporal supports.
You can find the code to sample the solution here.
The Problem
Let’s consider a scenario where you have a set of data points, files, or documents that need to be observed for changes (for example, by an AI agent or a high-level supervisor). We can call this process a “lifecycle.” These documents don’t necessarily generate much activity in your system and don’t consume resources when they are idle. However, everything changes the moment a user comes in and edits a file or uploads a new file.
In addition, users don’t tend to work at predictable intervals, so we expect a burst of activity followed by a period when utilizing resources or waiting for changes is not possible.
Some of the processes that watch for these file changes can not run during the actual user activity period (for example, summary re-generation using an AI agent). We want to accumulate some of these changes or wait for the user to complete the work first.
The Solution
The solution we will explore today utilizes the Temporal ability to create timer-based, event-driven handlers and state machines. The benefit of this approach is that you can create very complex state machines and event observers that can resolve dependencies along the way. After the work is completed, everything gets cleaned up, leaving no hanging processes or resource consumption, while also guaranteeing the sequential nature of document lifecycle handling and processing.
We will start with a simple Workflow and keep adding functions to it until reaching the desired solution.
Step 0: Prerequisites
Before diving deeper into the nuances of the Workflow, let’s start with our base abstractions: The event and event queue.
namespace App\DTO;
class Event
{
public function __construct(
public string $entity,
public string $action,
) {
}
}
namespace App\DTO;
use Temporal\Internal\Marshaller\Meta\MarshalArray;
class Queue implements \Countable
{
public function __construct(
#[MarshalArray(of: Event::class)]
private array $events = [],
) {
}
public function merge(Queue $queue): Queue
{
$this->events = array_merge($this->events, $queue->events);
return $this;
}
public function count(): int
{
return count($this->events);
}
// Moves all events to new queue and flushes the current queue
public function flush(): Queue
{
$queue = new Queue($this->events);
$this->events = [];
return $queue;
}
}
Temporal SDK allows us to marshal various objects and values to carry them between our code, Workflow, and Activities. We will use MarshalArray attribute to indicate a typed array for our purposes.
Step 1: Creating the Base Workflow
The first step is to create a base Workflow that encapsulates two primary methods: signal and Start. But before we create those, let’s construct the Workflow base:
namespace App\Workflow;
use App\Activity\ProcessActivity;
use App\DTO\Queue;
use Temporal\Activity\ActivityOptions;
use Temporal\Internal\Workflow\ActivityProxy;
use Temporal\Workflow;
#[Workflow\WorkflowInterface]
class DocumentWorkflow
{
private ProcessActivity|ActivityProxy $process;
private Queue $queue;
public function __construct()
{
$this->queue = new Queue();
$this->process = Workflow::newActivityStub(
ProcessActivity::class,
ActivityOptions::new()
->withStartToCloseTimeout(5)
->withTaskQueue('demo_workflow'),
);
}
}
Signal Method
The Signal method is used to push information to a Workflow for later processing in the main loop. This method should be able to receive the passed information (budget events) and process them sequentially in our main loop. The method can be designed as follows:
#[Workflow\SignalMethod]
public function capture(Queue $events): void
{
$this->queue = $this->queue->merge($events);
}
Workflow Start Method
Let’s create a simple workflow that calls a single Activity upon receiving a document creation event. Here’s our main loop:
#[Workflow\WorkflowMethod(name: "document.events")]
public function run(string $document_id): \Generator
{
while (true) {
// processing our queue
yield $this->process->handleEvents(
$document_id,
$this->queue->flush()
);
// new events might arrive at this point
if ($this->queue->count() === 0) {
break;
}
}
}
As you can see, we now have a loop that processes events, and once the event processing is completed, the Workflow exits. If new events arrive during processing – we’ll repeat the process. Nothing fancy here so far and a lot of room for improvement (for example current version will flush even empty queue).
Step 2: Pushing Events to the Workflow
Now that we have a Workflow that can receive events, you might wonder about the benefit of using such a simple Workflow, and why we even need a Signal method in the first place. The trick lies in the code used to push these events to the Workflow.
function push(string $document_id, Event ...$event): void
{
$wf = $this->wfClient->newWorkflowStub(
class: DocumentWorkflow::class,
options: WorkflowOptions::new()
->withTaskQueue('demo_workflow')
->withWorkflowId($document_id)
->withWorkflowIdReusePolicy(IdReusePolicy::AllowDuplicate),
);
$this->wfClient->startWithSignal(
workflow: $wf,
signal: 'capture',
signalArgs: [new Queue($event)],
startArgs: [$document_id],
);
}
We assume that document_id is a unique file ID and $this->wfClient is an instance of Temporal Client SDK. If the workflow is not running, it will automatically be created. Then, the signal will be called on the newly created or existing workflow.
You can find the full code of push helper here.
This approach allows us to push events to a Workflow queue that is created on demand and flushed out as soon as the queue is empty.
The best part of this approach is that Temporal guarantees us that events will arrive to the Workflow, no matter what. However, only events added to the Workflow during the Activity Execution will be queued in the same instance; otherwise, the Workflow will exit and we will have to restart it over and over.
Step 3: Introducing Timers
While the Temporal Workflow can now receive and process events, let’s discuss how we can incorporate Timers to handle user behavior more effectively. In applications where you consume user events, time management becomes crucial as users tend to make changes in bursts rather than producing a single event every minute. They work when they are in the mood and don’t work at other times.
In this particular Workflow we don’t carry any state but for more complex scenarios loading the document state and updating it could be a heavy process where frequent Workflow restarts are undesirable.
We have multiple options to implement such behavior, to start we will use a simple awaitWithTimeout method.
#[Workflow\WorkflowMethod(name: "document.events")]
public function run(string $document_id): \Generator
{
while (true) {
// processing our queue
yield $this->process->handleEvents(
$document_id,
$this->queue->flush()
);
$ok = yield Workflow::awaitWithTimeout(10, fn() => $this->queue->count() !== 0);
if (!$ok && $this->queue->count() === 0) {
// workflow is stale
break;
}
}
// we are done
}
We modified the conditioning inside the Workflow so that it doesn’t only wait for the queue but also waits for the timer to be triggered. If the Timer is triggered and no events added, we can exit the Workflow. Please note that the Timer will be triggered after each batch; we will optimize this behavior down below.
This approach allows us to accumulate events for new changes without exiting the Workflow immediately. The Workflow will continue for 10 seconds since the last user event but will flush the first batch of events immediately.
Continue-as-New
The solution we just created will work fine for a short burst of events triggered by the user, but when the amount of events could span for hundreds and more, keeping the Workflow alive for too long might cause failure due to history size. We can avoid this problem via automatic Workflow restart by using continueAsNew when the history limit is reached.
#[Workflow\WorkflowMethod(name: "document.events")]
public function run(string $document_id, ?Queue $queue = null): \Generator
{
if ($queue !== null) {
// we want to make sure we captured previous and current events
$this->queue = $queue->merge($this->queue);
}
while (true) {
// processing our queue
yield $this->process->handleEvents(
$document_id,
$this->queue->flush()
);
$ok = yield Workflow::awaitWithTimeout(10, fn() => $this->queue->count() !== 0);
if (!$ok && $this->queue->count() === 0) {
// workflow is stale
break;
}
// our workflow is too large, let's continue as new
if (Workflow::getInfo()->historyLength > 500) {
break;
}
}
if ($this->queue->count()) {
// restart as new workflow
yield Workflow::continueAsNew(
'document.events',
[$document_id, $this->queue],
Workflow\ContinueAsNewOptions::new()
->withTaskQueue('demo_workflow'),
);
}
}
We modified our code to carry an unprocessed queue between Workflow runs and halting the execution if our history grows past the limit. And once again, the beauty of Temporal is that the system will guarantee us that no incoming events will be lost during the Workflow restart, giving us a truly durable and time-constrained event observer.
While this approach looks simplistic, imagine that you can also carry the current state of process between Workflows, essentially having the ability to continue work for an infinite amount of time.
Timers, more timers!
Now that we have a baseline, what about trying something a bit more fun? Let’s say that the nature of our application dictates us the need to throttle and batch user events, as well as perform their deduplication. While this requirement sounds like a whiteboard conversation at first, inside the Temporal Workflow, we are actually looking to make a modest change.
First of all, we can solve the deduplication at the Queue level, since we persist all buffered events prior to their processing we can use this list to perform filtering. The addition of throttling later will help us to accumulate more deduplication samples.
Let’s add this method to the Queue class:
public function mergeWithoutDuplicates(Queue $queue): Queue
{
$this->events = array_merge(
$this->events,
array_udiff($queue->events, $this->events, fn($a, $b) => $a <=> $b),
);
return $this;
}
We will come back to the Signal method update in a moment. Now, let’s create our own Timer implementation called RollingTimer.
This class only triggers the processing when either the queue is filled up with a desired capacity or the timer fires, unline awaitWithTimeout this helper will attempt to reuse created timers, saving us few extra events in our workflow history:
namespace App\Helpers;
use App\DTO\Queue;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use Temporal\Workflow;
use function React\Promise\resolve;
class RollingTimer
{
private \DateTimeInterface $last;
private ?PromiseInterface $timer = null;
private ?Deferred $ready = null;
public function __construct(
readonly private int $waitSeconds,
) {
$this->last = Workflow::now();
}
public function touch(): void
{
$this->last = Workflow::now();
}
public function wait(Queue $queue, int $size): PromiseInterface
{
$this->ready ??= new Deferred();
if ($this->timer === null) {
$this->timer = Workflow::timer($this->waitSeconds);
$this->timer->then($this->tick(...)); // unlocks current $this->ready
}
return Workflow::await(
fn() => $queue->count() >= $size,
$this->ready->promise(),
)->then($this->reset(...));
}
private function tick(): void
{
$this->timer = null; // old timer gone
if ($this->ready === null) {
return;
}
// how long time passed since last event
$passed = Workflow::now()->getTimestamp() - $this->last->getTimestamp();
if ($passed < $this->waitSeconds) { // we captured recent event so the pipe is still alive
$this->timer = Workflow::timer($this->waitSeconds);
$this->timer->then($this->tick(...));
return;
}
$this->ready->resolve(true);
$this->ready = null;
}
private function reset(): void
{
$this->ready?->reject();
$this->ready = null;
}
}
Let’s update our Signal method now (don’t forget to construct RollingTimer in your Workflow constructor):
#[Workflow\SignalMethod]
public function capture(Queue $events): void
{
$this->queue = $this->queue->mergeWithoutDuplicates($events);
$this->waiter->touch(); // indicating fresh data
}
Now, every new added set of data will advance an internal time mark stored inside RollingTimer letting it know when the last event happens so that we can wait for a needed duration after that event.
The modification of the primary Workflow method is straightforward as well:
#[Workflow\WorkflowMethod(name: "document.events")]
public function run(string $document_id, ?Queue $queue = null): \Generator
{
if ($queue !== null) {
// we want to make sure we captured previous and current events
$this->queue = $queue->merge($this->queue);
}
while (true) {
// wait for timer or queue to fill up for 8 items
yield $this->waiter->wait($this->queue, size: 8);
// no batches to wait for, exiting
if ($this->queue->count() === 0) {
break;
}
// processing our queue
yield $this->process->queue($document_id, $this->queue->flush());
// our workflow is too large, let's continue as new
if (Workflow::getInfo()->historyLength > 500) {
break;
}
}
if ($this->queue->count()) {
// restart as new workflow
yield Workflow::continueAsNew(
'document.events',
[$document_id, $this->queue],
Workflow\ContinueAsNewOptions::new()
->withTaskQueue('demo_workflow'),
);
}
}
You can experiment with this demo using this Github repository; it includes two runner files, one of which can emulate random event activity across multiple documents.
Conclusion
By leveraging Temporal features, such as timer-based event-driven handlers, we can create complex workflows that handle user events efficiently. Add a state machine into this Workflow and get a durable and reliable instrument to control document lifecycle in ~200 lines of code!
Stay tuned for more articles in this series, where we will dive deeper into the functionality of Temporal and explore its applications in various domains, including AI and beyond.