Temporal Workflow and Microservices

Temporal Workflow and Microservices

Last year, we published a post titled “Understanding Concurrency and Parallelism in Golang” where we described how developers can optimize their code using Go concurrency. That post described the approaches to speeding up Go applications with the parallelization of computations by algorithm and data. Even with this trick, however, an application is still limited by the single hosting computer or container, which has its CPU and RAM limits that the code cannot exceed.

The limitation can be solved by extending the limits, but this inevitably raises costs. The program may not be used all the time, but even in those low-activity periods, you can still end up paying for expensive hardware.

Is there a better solution? Yes! And it’s called “microservices architecture.”

When you can run the same application and spread the load of the application instances across different hosts, it’s called load balancing. Microservices architecture allows developers to create new hosts and start new instances of an application on demand.

There are common principles that microservices should follow:

  • Small. A microservice should be simple. It should be easy to understand by one person.
  • Focused. A microservice should solve one business task and do it in the most generic way, so it can be reused for the same purposes but in another part of the system.
  • Loosely coupled. A microservice should not require any changes and should relate to external APIs; it shouldn’t have any database relations nor backdoors.
  • Highly cohesive. A microservice should contain all the necessary methods to solve a given business task. There should not be another microservice performing the same activity unless it’s in a completely different manner.

The blog post mentioned above experimented with the following task: we need to use an application to calculate metrics for multiple specific figures. The figures were parallelepipeds, and we needed to compute their base square and volume. This was not a real-world example, but it was a good way to clearly demonstrate the basic principles.

Using the microservices principle above, we find that we can use the following microservices to complete the same task:

  • Square Microservice - computes the Square.
  • Volume Microservice - computes the Volume.
  • Parallelepiped Microservice - splits all the parallelepipeds into batches and sends the batches to the Square Microservice and Volume Microservice in parallel.

Temporal Workflow and Microservices

The Parallelepiped Microservice depends on the Square and Volume Microservices. So this microservices is less generic than the services above. The dependencies between the microservices should follow the same strategy as the dependency among the components - they should form the Directed Acyclic Graph (DAG).

Microservices Communication

How will the Parallelepiped Microservice call the Square and Volume Microservices? The microservices are located in different hosts, so the service needs to use the Remote Procedure Call (RPC) technique. There are a number of standard synchronous transports for that: HTTP, gRPC (Google RPC), or some asynchronous transports like MQ (Message Queues).

To achieve this, the Parallelepiped Service must know the IP address of Square and Volume Services. It should also take into account the fact that there are many possible instances of each of the services. So we need to have one more component that will facilitate the Load Balancing among the instances and Service Discovery for obtaining the right service addresses. The most widespread approach is when Parallelepiped Service has a domain name like volume.microservice and square.microservice and the Service Discovery plus Load Balancing are located inside the domain name system service (DNS). Thus, the DNS will resolve the domain names into certain IPs (192.168.1.112) for concrete service instances and balance the load:

Temporal Workflow and Microservices

Distributed System Problems

So far, so good! The Parallelepiped Microservice knows the domain names of the Volume Microservices and Square Microservice and does its job accepting a big number of parallelepiped figures with their dimensions, splitting them by batches, and sending them to compute the volume and square in parallel.

What problems might arise with this type of communication? Well, seeing as the services are distributed among the network - the network may go down, the services host computers may stop or fail, and/or the Parallelepiped Service may fail as well. That’s why it is important to predict edge cases and provide for:

  • Retries mechanism for repeating the calls that were interrupted because of the network fails or target service fails
  • Backpressure mechanisms in order to limit the load for the target services (Square and Volume Microservices)
  • Persistence of the state of the process (workflow) at Parallelepiped Service
  • Timeouts management

If we only use known transports like HTTP, gRPC, or MQ, all those fail-safes should be implemented inside the microservices for each of the transports. If we take into account that the number of services can be relatively big, and we have a number of the transports, however, the task of integrity can become a real pain. In most cases, we still will not be able to guarantee that Parallelepiped Service will complete its job successfully.

Ultimately, with those methods, all those things inside the microservices break the microservice principles enumerated at the beginning of the post: the microservices start to grow, they shift focus from the primary business functionality, and they don’t cohesively work together, since, at this point, any service could solve the same problems as the next service.

Microservices Workflow

Retries, backpressure, and state persistence are relatively generic requirements and do not specifically relate to the Parallelepiped Calculation application. They can be included in a group of general long-time communications of microservices called “microservice workflows.” Utilizing a separate service that provides all the required features for the aforementioned workflows can make this type of problem-solving far more efficient and successful.

There is a multitude of workflow engines to choose from. As a software development company, we opt for Temporal engine, a microservice orchestration platform written in Go that runs as a separate service set. The official site with the documentation FAQs can be found at https://temporal.io/. In addition to using Temporal in our software development, we have also partnered with them to create Temporal integrations with PHP.

Temporal’s workflow engine works on the transport level and provides service discovery, load balancing, and resolution of stability and reliability problems of the distributed nature of the microservice architecture. It enables retries, backpressure, and makes the execution of the workflows incredibly reliable.

Temporal Workflow Example

So, how does Temporal work? Start with a diagram of how Spiral Scout’s microservices look with Temporal:

Temporal Workflow and Microservices

Temporal starts as the service group, which includes the temporal service, the persistence layer (Cassandra or PostgreSQL), and the TemporalUI service for visual management of the workflows in a browser. Locally it can be started with the simple docker-compose

https://github.com/guntenbein/temporal_microservices/blob/main/docker-compose.yml

Repository Structure

You can find a repository example on how we use the Temporal Workflow engine to commute the Parallelepiped Microservices system here:

https://github.com/guntenbein/temporal_microservices

The repository includes the three microservices: Parallelepiped Workflow, Square, and Volume. Typically, every microservice has its own repository, but this is not an obligatory rule. Developers need to isolate the code for the services; this can be done in the scope of the same repository, and it works perfectly for the small microservices that can be located at the same repository but deployed and run separately.

The domain folder contains the services, activities, and workflow for each of the microservices. The cmd folder contains the main functions to run the Temporal workers and register the activities and workflows. The controller folder contains the code to start the workflow by HTTP transport. Actually, the repository can easily be split into three microservices repositories, and the starter can also be moved into a separate repository on-demand.

In fact, we always separate different domains inside the same repository, so they do not have a common service code. This way, we can move a domain from one microservice to another quickly and efficiently.

Don’t forget the common code located in the domain folder. Usually, developers can create a separate repository, named go-workflows, for example, and place all the clean common pieces of code there with their tests so any microservice can reuse it.

Activities

All services use Temporal API based on gRPC protocol buffers as the transport. If a service provides some functionality to be used in a Temporal Workflow, it should declare and register the activity. The activity is a function, having context as the first argument. So the activity for calculation of the Square looks like this:

go
func (s Service) CalculateRectangleSquare(ctx context.Context, req CalculateRectangleSquareRequest) (resp CalculateRectangleSquareResponse, err error) {

The activity contains the part of the code responsible for communicating heartbeats to Temporal in order to notify that it is still alive:

go
heartbeat := domain.StartHeartbeat(ctx, temporal_microservices.HeartbeatIntervalSec)
defer heartbeat.Stop()

The rest is just a business code for square calculation.

Workers

Any microservice should start a Temporal worker and then register the activities to the worker:

go
worker.RegisterActivity(square.Service{}.CalculateRectangleSquare)

The worker is given options as to how many activities can be run in parallel (i.e., solving backpressure). Then the worker is declared for a certain queue.

go
worker := worker.New(temporalClient, temporal_microservices.SquareActivityQueue, workerOptions)				

This way, Temporal limits the capacity of different types of functionalities (activities and workflows registered for the worker).

Workflow

The Parallelepiped Microservice declares its own workflow:

go
func CalculateParallelepipedWorkflow(ctx workflow.Context, req CalculateParallelepipedWorkflowRequest) (resp CalculateParallelepipedWorkflowResponse, err error) {
	if len(req.Parallelepipeds) == 0 {
		err = BusinessError{"there are no figures to process"}
		return
	}

	if isIDsRepeat(req.Parallelepipeds) {
		err = BusinessError{"the ids of the figures in the input should be unique"}
		return
	}

	if req.BatchSize <= 0 {
		err = BusinessError{"makeBatch size cannot be less or equal to zero"}
		return
	}

	batchCount := batchCount(len(req.Parallelepipeds), req.BatchSize)

	selector := workflow.NewNamedSelector(ctx, "select-parallelepiped-batches")
	var errOneBatch error
	cancelCtx, cancelHandler := workflow.WithCancel(ctx)

	count := 0
	squareMap := make(map[string]float64)
	volumeMap := make(map[string]float64)
	for i := 0; i < batchCount; i++ {
		batch := makeBatch(req.Parallelepipeds, i, req.BatchSize)

		future := processSquareAsync(cancelCtx, batch)
		selector.AddFuture(future, func(f workflow.Future) {
			respSquare := square.CalculateRectangleSquareResponse{}
			if err := f.Get(cancelCtx, &respSquare;); err != nil {
				cancelHandler()
				errOneBatch = err
			} else {
				copyResult(squareMap, respSquare.Squares)
			}
		})
		count++

		future = processVolumeAsync(cancelCtx, batch)
		selector.AddFuture(future, func(f workflow.Future) {
			respVolume := volume.CalculateParallelepipedVolumeResponse{}
			if err := f.Get(cancelCtx, &respVolume;); err != nil {
				cancelHandler()
				errOneBatch = err
			} else {
				copyResult(volumeMap, respVolume.Volumes)
			}
		})
		count++
	}

	// wait until everything processed
	for i := 0; i < count; i++ {
		selector.Select(ctx)
		if errOneBatch != nil {
			return CalculateParallelepipedWorkflowResponse{}, errOneBatch
		}
	}

	// map the output
	var outputFigures = make([]Parallelepiped, 0, len(req.Parallelepipeds))
	for _, p := range req.Parallelepipeds {
		outputP := p
		outputP.BaseSquare = squareMap[p.ID]
		outputP.Volume = volumeMap[p.ID]
		outputFigures = append(outputFigures, outputP)
	}
	return CalculateParallelepipedWorkflowResponse{Parallelepipeds: outputFigures}, nil
}

The workflow then takes the input parallelepiped data array and splits it into the batches of the size declared in the input. Then it sends it to the square activity as follows:

go
func processSquareAsync(cancelCtx workflow.Context, batch []Parallelepiped) workflow.Future {
	future, settable := workflow.NewFuture(cancelCtx)
	workflow.Go(cancelCtx, func(ctx workflow.Context) {
		ctx = withActivityOptions(ctx, temporal_microservices.SquareActivityQueue)
		respSquare := square.CalculateRectangleSquareResponse{}
		// map the domain structures
		dimensions, err := copySquareBatch(batch)
		if err != nil {
			settable.Set(nil, err)
			return
		}
		err = workflow.ExecuteActivity(ctx,
			square.RectangleSquareActivityName,
			square.CalculateRectangleSquareRequest{Rectangles: dimensions}).Get(ctx, &respSquare;)
		settable.Set(respSquare, err)
	})
	return future
}	

And the volume activity in parallel.

go
func processSquareAsync(cancelCtx workflow.Context, batch []Parallelepiped) workflow.Future {
	future, settable := workflow.NewFuture(cancelCtx)
	workflow.Go(cancelCtx, func(ctx workflow.Context) {
		ctx = withActivityOptions(ctx, temporal_microservices.SquareActivityQueue)
		respSquare := square.CalculateRectangleSquareResponse{}
		// map the domain structures
		dimensions, err := copySquareBatch(batch)
		if err != nil {
			settable.Set(nil, err)
			return
		}
		err = workflow.ExecuteActivity(ctx,
			square.RectangleSquareActivityName,
			square.CalculateRectangleSquareRequest{Rectangles: dimensions}).Get(ctx, &respSquare;)
		settable.Set(respSquare, err)
	})
	return future
}

The Parallelepiped Microservice does not care where the activities are located at. The addresses of the Square and Volume services are unknown to the Parallelepiped Microservice. It assumes Temporal will know this. But in our implementation, the activities use the Request-Response structures. The structures are located in the Square and Volume Microservices:

go
type Rectangle struct {
	ID     string
	Length float64
	Width  float64
}

type Service struct{}

type CalculateRectangleSquareRequest struct {
	Rectangles []Rectangle
}

type CalculateRectangleSquareResponse struct {
	Squares map[string]float64
}	

So, the Parallelepiped Microservice code ends up with the dependency from the Square and Volume Microservices. This is good because it reveals at the very least what versions of the Square and Volume Microservices are required to run the Parallelepiped Workflow.

go
type Rectangle struct {
	ID     string
	Length float64
	Width  float64
}

type Service struct{}

type CalculateRectangleSquareRequest struct {
	Rectangles []Rectangle
}

type CalculateRectangleSquareResponse struct {
	Squares map[string]float64
}

The workflow is also registered by a separate worker, running inside the Parralelepiped Microservice:

go
worker.RegisterWorkflow(workflow.CalculateParallelepipedWorkflow)			

Every activity runs with its own options - it is directed to the dedicated Temporal Queue and can have its own retry policy and timeouts:

go
func withActivityOptions(ctx workflow.Context, queue string) workflow.Context {
	ao := workflow.ActivityOptions{
		TaskQueue:              queue,
		ScheduleToStartTimeout: 24 * time.Hour,
		StartToCloseTimeout:    24 * time.Hour,
		HeartbeatTimeout:       time.Second * 5,
		RetryPolicy: &temporal.RetryPolicy;{
			InitialInterval:        time.Second,
			BackoffCoefficient:     2.0,
			MaximumInterval:        time.Minute * 5,
			NonRetryableErrorTypes: []string{"BusinessError"},
		},
	}
	ctxOut := workflow.WithActivityOptions(ctx, ao)
	return ctxOut
}

Temporal workflows use Futures and Selectors structures in order to provide concurrent execution of the activities. Though the process of running the workflow may fail, Temporal is dedicated to completing it by any means. In reality, the workflow code provides the tasks needed for concurrent execution of activities, and the activities are thusly executed in parallel by the Temporal server. That’s why developers should not use goroutines and channels in a workflow code. Temporal provides its own replacements for them.

Workflow Starter

Temporal now has everything ready to start the workflow and execute its activity, but it needs to be told to start the workflow and provide the input data. Use the simple HTTP handler with the Parralelepiped Microservice that receives the parallelepiped data to start the workflow:

go
func executeWorkflow(ctx context.Context, temporalClient client.Client, wReq workflow.CalculateParallelepipedWorkflowRequest) (output []workflow.Parallelepiped, err error) {
	workflowOptions := client.StartWorkflowOptions{
		TaskQueue: temporal_microservices.FigureWorkflowQueue,
	}
	workflowRun, err := temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflow.CalculateParallelepipedWorkflow, wReq)
	if err != nil {
		return
	}
	workflowResp := workflow.CalculateParallelepipedWorkflowResponse{}
	err = workflowRun.Get(ctx, &workflowResp;)
	if err != nil {
		return
	}
	return workflowResp.Parallelepipeds, nil
}

Note, the starter waits for the workflow to be completed by instruction and therefore executes it in a synchronous way.

go
err = workflowRun.Get(ctx, &workflowResp;)

You can actually initiate the workflow and immediately return HTTP code 202 Accepted to verify that the workflow has started but is not yet finished. This option is more suitable for long-term running workflows because of the HTTP call timeouts.

Running the Workflow

Prerequisites to run the project:

Start by checking out the project using the following commands:

git clone https://github.com/guntenbein/temporal_microservices.git

Then, run the Temporal services by typing the following command from the root of the project:

docker-compose up -d

When the Temporal service starts, run the microservices one-by-one:

go run cmd/microservice_square/main.go
go run cmd/microservice_volume/main.go
go run cmd/microservice_workflow/main.go				

Then, prepare the input data. Here is a useful site for online JSON generation. Using the following snippet, generate the input for the workflow:

{
  BatchSize: 200,
  Parallelepipeds:
    [
      '{{repeat(5000, 10000)}}',
      {
        ID: '{{objectId()}}',
        Length: '{{floating(0, 10000)}}',
        Width: '{{floating(0, 10000)}}',
        Height: '{{floating(0, 10000)}}'
      }
    ]
}
}

Then, run the workflow with the command:

curl --location --request POST 'localhost:8080' \
--header 'Content-Type: application/json' \
--data-raw '{
  "BatchSize": 3,
  "Parallelepipeds": [
    {
      "ID": "5fedcbf7901feb7213e84153",
      "Length": 7584.6668,
      "Width": 8551.7289,
      "Height": 7911.1765
    },
    {
      "ID": "5fedcbf755d18a8e807432d2",
      "Length": 9854.9176,
      "Width": 2333.052,
      "Height": 9977.8465
    },
    {
      "ID": "5fedcbf776f93aa072884a6e",
      "Length": 6186.1635,
      "Width": 7257.3111,
      "Height": 744.9772
    },
    {
      "ID": "5fedcbf7cab168b5fa23e0bf",
      "Length": 1487.9815,
      "Width": 6904.6383,
      "Height": 6917.9239
    },
    {
      "ID": "5fedcbf7f1d0f9b6c8d94478",
      "Length": 9579.1483,
      "Width": 3908.5532,
      "Height": 1622.9292
    },
    {
      "ID": "5fedcbf7fd6de933e37141c6",
      "Length": 6060.2393,
      "Width": 5232.1464,
      "Height": 5528.2147
    },
    {
      "ID": "5fedcbf7a27adeb782670e09",
      "Length": 7608.5178,
      "Width": 3490.3491,
      "Height": 6064.8596
    },
    {
      "ID": "5fedcbf74cc4c73c8bdb6652",
      "Length": 6061.7923,
      "Width": 8985.7511,
      "Height": 7535.418
    },
    {
      "ID": "5fedcbf7b89726935be95418",
      "Length": 8633.0144,
      "Width": 4433.371,
      "Height": 2310.1432
    },
    {
      "ID": "5fedcbf7b55873c13a3d09c6",
      "Length": 6324.4951,
      "Width": 2566.1975,
      "Height": 7536.6964
    }
  ]
}'

After a while, the response with the square and volume inside will be returned:

json
[
  {
    "ID": "5fedcbf7901feb7213e84153",
    "Length": 7584.6668,
    "Width": 8551.7289,
    "Height": 7911.1765,
    "BaseSquare": 64862014.27043052,
    "Volume": 513134843038.89453
  },
  {
    "ID": "5fedcbf755d18a8e807432d2",
    "Length": 9854.9176,
    "Width": 2333.052,
    "Height": 9977.8465,
    "BaseSquare": 22992035.216515202,
    "Volume": 229410998112.98294
  },
  {
    "ID": "5fedcbf776f93aa072884a6e",
    "Length": 6186.1635,
    "Width": 7257.3111,
    "Height": 744.9772,
    "BaseSquare": 44894913.034964845,
    "Volume": 33445686607.031612
  },
  {
    "ID": "5fedcbf7cab168b5fa23e0bf",
    "Length": 1487.9815,
    "Width": 6904.6383,
    "Height": 6917.9239,
    "BaseSquare": 10273974.05459145,
    "Volume": 71074570660.2381
  },
  {
    "ID": "5fedcbf7f1d0f9b6c8d94478",
    "Length": 9579.1483,
    "Width": 3908.5532,
    "Height": 1622.9292,
    "BaseSquare": 37440610.74123956,
    "Volume": 60763460437.79133
  },
  {
    "ID": "5fedcbf7fd6de933e37141c6",
    "Length": 6060.2393,
    "Width": 5232.1464,
    "Height": 5528.2147,
    "BaseSquare": 31708059.23663352,
    "Volume": 175288959180.42822
  },
  {
    "ID": "5fedcbf7a27adeb782670e09",
    "Length": 7608.5178,
    "Width": 3490.3491,
    "Height": 6064.8596,
    "BaseSquare": 26556383.255563978,
    "Volume": 161060735928.78644
  },
  {
    "ID": "5fedcbf74cc4c73c8bdb6652",
    "Length": 6061.7923,
    "Width": 8985.7511,
    "Height": 7535.418,
    "BaseSquare": 54469756.827696525,
    "Volume": 410452386055.04724
  },
  {
    "ID": "5fedcbf7b89726935be95418",
    "Length": 8633.0144,
    "Width": 4433.371,
    "Height": 2310.1432,
    "BaseSquare": 38273355.6835424,
    "Volume": 88416932373.51683
  },
  {
    "ID": "5fedcbf7b55873c13a3d09c6",
    "Length": 6324.4951,
    "Width": 2566.1975,
    "Height": 7536.6964,
    "BaseSquare": 16229903.514382252,
    "Volume": 122319855389.19206
  }
]

Visit Temporal UI to see the result of the workflow execution there:

Temporal Workflow and Microservices Temporal Workflow and Microservices

It is clear that the input payload was split into batches and executed by the Square and Volume activities in parallel.

Workflow Faults Resistance

When One of the Services Does Not Run

What happens if we run the same process but stop the Volume Microservice? The HTTP request remains idle, the workflow stays in the running state, and the Volume activities are pending:

Temporal Workflow and Microservices Temporal Workflow and Microservices

When the Volume Microservice is restarted, however, the workflow completes with success.

When One of the Services Returns an Internal Error

Let us place a string, returning the error inside the CalculateParallelepipedVolume method in the volume service:

go
return resp, errors.New("server error")			

In this case, the service will always return this error. When the workflow is started, the HTTP call will idle, and the workflow will stay in the running state with the following pending activities notification:

Temporal Workflow and Microservices

But, when we remove the error and restart the Volume service, then the workflow will complete. It will happen after a while because of the exponential backoff retry policy.

Workflow Unit Tests

Temporal workflow provides a solid framework for testing workflow code without actually running any activities. You can mock up activities on-the-fly without generation. Here is an example of such a test:

go
for i := 0; i < count; i++ {

If we run the tests with the -race flag, then we also can ensure that there are no races:

go test -v -race ./…			

At first glance, it may seem that we can have a run race here:

go
if err := f.Get(cancelCtx, &respSquare;); err != nil {

But it is not right, and this is proved by the tests running with -race flag and -count flag:

go
go test -v -race -count 100 ./…

The races occur because the activities are executed in parallel, but their results are fetched sequentially here:

go
for i := 0; i < count; i++ {

Conclusions

The Temporal Workflows engine is incredibly powerful and provides full control over workflows, including retries, timeouts, backpressure, and the parallel execution of activities. Workflows can be written in most programming languages and include checks, conditions, and sequences of execution. Key benefits include:

  • The Temporal engine guarantees workflow completion, even if the services running the workflow and activities fail. Temporal services, including the database, can run as a cluster and are also fault-tolerant.
  • The TemporalUI is a convenient way of viewing and controlling workflows, and they are working on adding additional functionalities like displaying the list of running workers and their activities/workflows.
  • There is the unit test suite that facilitates creating unit tests within the workflows.
  • There is a possibility to connect Elasticsearch to the Temporal Workflow for more advanced search inside the workflow data
  • There are currently two clients written specifically for Temporal, Java and Go, and work is in progress for PHP, Ruby, and others.
  • Temporal can be used for any processes consisting of a number of long-running steps and for big input data portions. Note: Services that stream data may not have the right application for Temporal.

Summary: Temporal is a very good, reliable, fault-tolerant workflow engine that can simplify and manage a wide range of long-running processes by orchestrating microservices.