Skip to content

Aecor — Purely functional event sourcing in Scala. Part 5

Hello! This is a series of posts about Aecor — a library for building eventsourced applications in Scala in purely functional way. If this is the first time you see this series, I highly recommend to start with Introduction.

In parts 1-4 we covered Aecor from A to Z. In this post we’re going to look at Process Manager pattern and how it can be implemented with Aecor.

What is a Process?

Now, that’s an overloaded term we have here. We have even discussed processes in the context of Distributed Processing in the previous post! This time I want to look at business processes.

So what’s a business process? There are many legit ways to answer this question.

A generally known definition of a business process is a sequence of actions that leads to some meaningful valuable result for the business. These actions might have to be performed by different people, machines, business entities or even different companies. They can also be distributed in time and location.

If we get back to our booking system, a business process of booking tickets would look like this:

  1. client places a booking;
  2. internal booking confirmation system either confirms or denies the booking;
  3. when the booking is confirmed, client is allowed to pay for it;
  4. when payment is received, tickets are considered purchased and booking is settled;
  5. if payment is not received until the expiration deadline, booking expires and tickets are released.

It’s quite simplified: a real system would, for example, have to add several notifications along the way. But this process is good enough for the purposes of the post.

A business process can be really complex, but what’s important is that it can be represented as a sequence (or even a directed graph) of steps. Steps can be different in nature:

Process may involve external actions, that might never happen. Receiving payment from client is a good example. In case it doesn’t happen, a process is either stuck, or should continue through another branch (send a reminder to pay, or expire a booking). Such boundaries are often good points to split your process into sub-processes.

Another kind of step is a reaction to a particular event. Such step is often easy to identify — it’s described using a following pattern:

When A happens, then B should be done.

By the way, in eventstorming, such process steps are called policies. In our booking process above there are two steps of this kind:

  1. When a booking is placed, then system has to confirm it.
  2. When expiration deadline is reached and booking is not yet paid, then system has to expire the booking.

Such policies are glue, that wire up different independent entities and services into meaningful continuous workflows. If you have tried IFTTT or similar services — it’s a very good demonstration of why such glue is useful.

Processes in eventsourcing

Let’s narrow down our scope to eventsourcing. What’s cool about it, is that it allows to be really explicit about business processes. Reacting to events is essential in properly defining a process, and by employing eventsourcing we’re making it as natural and simple as it can be.

Let me show you, in my opinion, the best intuition of a process in eventsourcing.

If we put aside external systems, all we have in an eventsourced application are entities. Entities issue events in reaction to commands. A process step is a natural complement to that: it issues commands in reaction to events.

Here’s a possible example from our ticket booking domain. I’m using simplified eventstorming notation here:

Booking payment process wires up Payment and Booking entities into a workflow, that has business value

Booking payment process listens to payment events, and whenever a payment for booking succeeds, it issues a command to Booking entity to settle the corresponding booking.

So now we have a sufficient set of building blocks to develop workflows of arbitrary length and complexity in an eventsourced system. Nothing changes fundamentally if we add external systems to the mix: all the collaboration can be adapted to commands and events.

Process manager

What I just described is one of interpretations of Process Manager pattern. A process manager can have several input events, issue commands to several different entities and even track it’s own internal state to make decisions.

Process manager allows to properly decouple entities. Without a Booking payment process in the example above, Booking entity would have to know about PaymentSucceeded event and handle it on it’s own.

Process manager can also decouple services. Although it is not that simple as with entity boundaries. If we have payment and booking services only, then the one running the process will be coupled to another.

If this coupling is a problem, then extracting the process manager into a separate service can solve it. But it comes at an obvious cost of having another service in your system.

This trade-off at service level is known as Orchestration vs. Choreography. It’s a huge and a fairly known topic, so I won’t go deeper on it today. I’ll just point out that following Aecor-based example implementation is more on choreography side of things.

If you want to find more on Process Manager pattern in general, I can recommend this talk.

Process managers with Aecor

Let’s add a couple of process steps to our Aecor app. Booking confirmation process is going to be the first.

If you accidentally feel there’s not enough context, please, refer to the series github repo any time.

Booking confirmation process

After someone places a booking, our app has to reserve corresponding tickets in the ticket management system. If reservation is successful, then the actual price of the tickets is returned as well.

The process step would simply listen for BookingPlaced event, then try to reserve the tickets and send either a confirm or deny command to Booking entity depending on result.

Let’s define the step logic first. Essentially, process step is just a function from a coproduct of events to F[Unit]. Most of the time you can be explicit about it. Here we have just a single event as an input:

Let’s go through the process logic step by step:

  1. BookingPlaced event is our input. We also require entity key to be able to communicate to the same booking later down the process.
  2. First we try to reserve selected seats for the specified concert. Booking key is used as a unique reservation identifier.
  3. Reservation service returns an Either, so depending on the result, we issue deny or confirm command to the same booking instance. Note how we use the Booking entity gateway we developed in Part 3.
  4. We don’t expect either command to be rejected, so we log an error if it happens.

Just for reference, ticket reservation service contract looks like this:

In real life it would be a separate service, but in our example we’ll just stub it out. You can look up the stub interpreter in the series repo.

So the process step is defined, let’s now subscribe to events and make it alive! It’s usually a good practice to put all the process wiring into separate class. This allows your constructor-based DI to be nicely distributed over a tree of small readable files. Otherwise you can end up with a huge monstrous file where all the DI happens.

So to wire up our process step, we need a source of booking events. It’s provided by eventSource parameter, which is a function. A closer look to the signature of this function reveals that it’s just an abstraction over read-side journal’s eventsByTag interface.

So for each tag we launch a stream of events, collect only BookingPlaced ones and throw them into our process step handler. Once event is processed, we commit the offset to the offset store.

In case you want your process to have at most once delivery semantics, you just flip process and commit: first commit the offset and then process the event.

Such semantics might be useful for something like email notifications.

Notice that we give our process a distinct consumerId which has to be unique (at least within the offset store).

What we create in the end is a list of DistributedProcessing processes, where each process is a tag-based event stream, running through our process handler. Some upper level wiring file will just have to deploy these processes on the cluster and we’ll have a running process manager.

Visit Part 4 if you’d like a refresher on Aecor distributed processing.

Booking expiration process

This one is going to be a little different. Expiration process is going to periodically query booking view to see if there are bookings that should be expired. For each booking found, the process will issue an expire command.

Conceptually it’s the same event => command transformation, but the events are not explicit here: process mines new facts by constantly analyzing outside world.

As with the confirmation process, we start by defining the process step logic.

Separating process logic from infrastructure (like streaming, concurrency and offset management) has a nice benefit. Such isolated piece of logic is much easier to test — it’s just a function.

If you find yourself testing streams or offset management, consider refactoring your process.

So this process is quite simple

  1. It starts with receiving current time parameter.
  2. Booking repository provides a stream of still active bookings that have to be expired by now.
  3. For each such booking process issues expire command to the aggregate.

And that’s it. Here’s the BookingViewRepository contract for your reference. It’s an extended algebra of the booking view from Part 4. You can look up the implementation in the repo.

As you might guess, the only task of the wiring code is to periodically call the process. Fs2 has nice helpers for this:

We use fixedDelay to get an infinite stream of elements with specified time interval between them. Process invocation interval is configurable via frequency parameter. And to get current time we use Clock[F] from cats.

Higher level wiring looks too similar to show it again: just wire it up and deploy to distributed processing.

Why not use scheduler?

An alternative approach to implementing this process would involve scheduling expiration actions instead of constantly polling the view. It’s a viable alternative, but it’s definitely more complex. Running a consistent scheduler in a distributed environment is a hell of a task.

There can be several reasons to use scheduler. One is when you’re running at volumes or frequencies that make polling impractical. Another is when there’s no simple or reliable way to derive particular time-based fact at the process level.

By the way, a heavy load on the view is not that a strong reason — you can always build another view just to support your periodical process.

So, if there’s a sane way to avoid scheduler, I’d usually stick to it. Aecor has a deprecated schedule module, that provides distributed entity-based scheduler. Today using it is discouraged, not only because of complexity, but for design reasons as well.

post.commit

In his great talk “A Decade of DDD, CQRS, Event Sourcing” Greg Young names the lack of Process Managers one of biggest problems in event-driven systems these days.

I had a time in my career when I overlooked them. Having had that experience, today I completely agree with Greg on this point. I think it’s really important to be talking about processes explicitly both in conversations and in code.

Hope this post was a useful one. That’s it for today, thanks for reading!

Published inSoftware Development