<- ahri.net

Practical event driven & sourced programs in Haskell

I’ve been interested in event-driven systems for a long time now, along with the intriguing concept of storing every event to be sourced later to construct new “materialised” forms. I’m using these techniques in an upcoming project, and decided to open-source the event database I wrote as EventDB, a Haskell library providing a simple, safe and reasonably efficient way to store and access an event stream.

I’m going to provide a quick overview of the technique of driving a system through events, and what sourcing those events means in this context, and then show some code. Feel free to skip ahead to the code if you’re already familiar with this style!

Terminology & high level architectural view

Command
A small chunk of data expressing a desired change to the state.
State
A digested form of the events, usually there will be one used to validate commands and others built for specific querying roles.
Validation Error
The meaningful result of a failed command.
Event
An isolated piece of information recording an immutable fact.
Query
A read-only request made to a state.
Store
Some durable place to keep all events.
Subscriber
Any downstream system that would like to act upon any/all events.

Normal operation

command
yes, events
no, validation error
events
events
events
process new commands
state
Execute
Valid?
Commit to store
Apply to state
Publish to any subscribers

Replaying from store (e.g. to bootstrap program start)

events
state
Read from store
Apply to state
Use new state

Event-driven systems

Any interactive system is driven at least in part by events, usually triggered by a user; these might be click, typing, closing a browser tab, etc. The difference in a system self-described as “event-driven” is the embodiment of those events as first-class objects recognised globally as being a set of all things that happen to the system.

It’s probably worth mentioning that in order to remain sane the events tend to be a little less granular than “User clicked screen at point 1207x1682”, instead opting for atomic aggregates of raw events with some special meaning for the system, e.g. “User changed email address to [email protected]”.

Event sourcing

Event sourcing is a fancy term for maintaining an audit log of all events occurring in a system, and sourcing the overall state of that system by walking the log, digesting the entries into a state of some sort - in Haskell terms this is a “fold”. Everything else you might read is fluff around that central (and simple) concept.

I first heard about event sourcing at a previous job when I was working with C# and we had Greg Young come along to talk about his Event Store software, and the simplicity of the idea (coupled with his energetic delivery) left it bubbling around in my head for the past few years. With the recent rise in popularity of Apache Kafka (an event queue/store/processor) event-driven systems are being more widely discussed. It’s also worth noting that by configuring Kafka to hold events forever, it can behave as an event store too.

Wait, we store all events?

Yep, one interesting characteristic of an event-sourced system is that all events should be held, immutably, forever! Disk space is cheap these days, right?! All kidding aside, events don’t need to take up that much space, and there are a couple of approaches to reducing storage use that I’ll cover later. That said, this requirement should highlight that not all systems are suited to being event-sourced in such a puritanical sense.

Querying the data

One very important observation is that in a primitive form the data stored in an audit log isn’t particularly useful because we can’t easily query it; we have to digest those events into some useful form, which will normally be a program state but could equally be updating a relational database, or any other form of database for that matter. Once we have the data in a more useful form we can query it as usual.

You may be wondering, then, what the point of maintaining the log is: it’s there as the central form of truth for our program, because it’s immutable it allows us to trust the information held there, and to derive new digested (“materialised”) forms of the data in the face of changing requirements. This allows us to skirt the issue of database design being an up-front task that could have ramifications years into the future - as our needs change we can simply fold over the events and produce a new state in whatever guise that takes - be it in-memory, held in a database, or perhaps a specially tuned form for an admin interface vs. the one used in a public web app. This flexibility is enticing!

On the other hand, how many systems you’ve worked with ever swapped database, despite that awesome selling-point on whatever ORM (Object-Relational Model) you used that it supported 5 different databases? I wouldn’t invest in an event-sourced system for this nifty trick any more than I’d take the hit on magical complexity that comes with an ORM when I only had to talk to one database.

CQS (Command/Query Separation)

CQS and CQRS are object-oriented concepts talking about splitting responsibilities for dealing with data, where commands write data and queries read it. As a simplification we can consider queries to be dealt with already by existing technologies; e.g. using SQL to query a read-only database satisfies the requirement of a query in this context.

This style is natural to Haskell programmers, as purity and immutability are so strongly built into the language that reads and writes of data tend to be strongly delineated.

Commands are simple bits of data that, through execution against a state, result in zero or more events, which in turn are stored and applied back to the program state.

Inherent costs

The benefits we get with any abstraction always come at a cost, there’s always a compromise we make in adopting them whether it’s in terms of performance, code complexity or maintenance.

Building a system around events can be overkill; it gives us this superpower to go back to the start and rebuild everything in a new form, but that means an extra level of abstraction from solving the problem at hand, and also incurs an overhead because the system is now “eventually consistent” as the data we’re querying may not yet be up to date.

For more in-depth write-ups on the costs involved I suggest reading Event Sourcing is Hard and What they don’t tell you about event sourcing.

An example of this style’s use

I’m building an application with a fat-client model, where those clients may spend plenty of time offline, some notable characteristics of this application are:

This model allows me to avoid multiple patterns and paradigms, and other costly abstractions, so that I can share the logic on clients and the server.

Choosing an event-driven architecture means that I can take a consistent approach to the rather inconvenient aspect that clients might be offline a lot, and it’s low-traffic enough that storing all the events generated by a user feels like a reasonable task.

Storage space concerns

Storing every event could become costly depending on how many we’re dealing with. As such we can take a number of approaches to limit this cost (from least-to-most radical):

  1. Compress the data
  2. Rebuild the log from time to time, from the current state; this normalises the events, but may lose data in the process
  3. Build state snapshots and throw away all prior events - should only be considered if we wish to keep modelling our system as eventually-consistent and event-driven, but no longer need to rewind time, Kafka seems like a good solution for this case

Approach (2) is a little more subtle, being a compromise between the two extremes, so an example is in order:

If the state is a number and the current value is 5, perhaps this was arrived at through 3 events; “+3”, “+8” and “-6”, but as we understand the state, we know we can arrive at the current state with a single event: “+5”.

We lose the information about how many amendments were needed, which might be useful for understanding our user experience, but we save space so it may be a reasonable trade-off.

Note that this example shows events relative to a state, which is something that we should avoid in the general case (though it may be unavoidable) - guidance on events may be covered in another post.

In more clear-cut cases we may have retired some event that is no longer contributing to the state at all, so we can just strip all occurrences to save the space. Of course this means that it may be difficult to put them back if we change our minds later!

A worked example in Haskell

As a simple worked example of event sourcing we can consider a single bank account, let’s start with some data types!

newtype AcctHolder = AcctHolder String
newtype Balance = Balance Integer

data State = State
    { holder  :: TVar AcctHolder
    , balance :: TVar Balance
    }

Straightforward so far, we’re representing the whole program state as a single account for simplicity, and we’re using STM’s TVar abstraction to define granularity in the model.

Now, the rest of the CQS and event machinery:

data Command
    = Rename String
    | Deposit Integer
    | Withdraw Integer

data Event
    = Renamed String
    | Deposited Integer
    | Withdrew Integer
    deriving (Show, Read)

We’ve defined some operations on the account, in imperative language, and specified the events we expect to see, in past-tense. In this case there’s a 1:1 matching, but that doesn’t need to be the case, indeed we could trivially have events with no corresponding command, e.g. we could model interest rate alterations with events that have no command. This level of complexity will do for now though!

We’ve also had the compiler derive instances of Show and Read for simple serialisation purposes - of course more efficient representations may be worthwhile, but this is not a post about efficient binary representation so we’ll stick with simplicity.

Now, for some functions to play with these data structures:

exec :: State -> Command -> STM (Either String [Event])
exec state cmd = case cmd of
    Rename x        -> pure $ Right [Renamed x]

    Deposit x
        | x < 1     -> pure $ Left "Deposits of less than 1 are not allowed"
        | otherwise -> pure $ Right [Deposited x]

    Withdraw x -> do
        (Balance bal) <- readTVar $ balance state -- here we express a fine-grained dependency
        pure $ if | x > bal   -> Left "Insufficient balance"
                  | x < 1     -> Left "Withdrawls of less than 1 are not allowed"
                  | otherwise -> Right [Withdrew x]

This function allows us to execute commands against a state, doing validation on the command issued in the context of the current state, but at the same time usage of the readTVar means we are telling STM that we depend on the balance of the account, and any change to that balance will trigger an abort and re-run of the logic.

Note especially that no writes occur to the state; we’re just producing events to write. Now we need to apply them to the state:

apply :: State -> [Event] -> STM ()
apply state evs = (flip traverse_) evs $ \case
    Renamed x   -> writeTVar (holder state) $ AcctHolder x

    Deposited x -> do
        (Balance bal) <- readTVar $ balance state
        writeTVar (balance state) $ Balance $ bal + x

    Withdrew  x -> do
        (Balance bal) <- readTVar $ balance state
        writeTVar (balance state) $ Balance $ bal - x

It’s worth noting here that no checks are made here; when applying events to a state we are dealing with facts that have happened in the past; we can’t reject them now.

If it seems a bit disjoint to have both exec and apply that’s because it is: we split these up in order to re-use apply in another context; when we replay all events to construct a new state.

Now we’ve got the events we can write them to the state, again specifying writes to only the related data, take the Renamed event as an example; changes to the balance won’t trigger the abort/retry behaviour, this general capability allows us to express the narrowest set of dependencies possible, thereby improving potential concurrency for our system. To put this another way: if we depend on every bit of our state then any alterations to the state invalidate our current operation.

We wouldn’t have much of an event-sourced system if we didn’t write events to the audit log in order to source later. We can encode a transaction that calls exec and apply and writes the resultant events using STM’s atomically function:

transact :: Connection -> State TVar -> Command -> IO ()
transact conn state cmd = atomically $ do
    result <- exec state cmd
    case result of
        Left _    -> pure () -- in this case we're just ignoring errors
        Right evs -> do
            apply state evs
            writeEventsAsync (fmap (C.pack . show) evs) conn

All we’re doing here is throwing away invalid commands; in a real program we might do something useful in this case, but as an example this is already complex enough without that complication! We then use the show instance of Event to encode our events into simple strings and write them to the database in a single transaction.

Now, to put it all together we can create a state:

state <- State <$> newTVarIO (AcctHolder "John Smith") <*> newTVarIO (Balance 0)

and execute a bunch of commands against that state, concurrently:

conn <- openConnection "/tmp/eventdb-bank-acct-demo"
mapConcurrently_
    (>>= transact conn state)
    $ (replicate 10 $ randomCommand) <> [pure $ Rename "Jemima Schmidt"]

We’ll skip the implementation of randomCommand in the interests of brevity, but the full source code of a working program is available for inspection.

The demo program linked above does a little bit more than we have here, specifically it replays the event log into a new state in order to compare against the original to verify that the resulting program state is indeed exactly the same.

Now, to run our program (Windows is not currently supported, sorry):

$ git clone https://github.com/ahri/eventdb.git && cd eventdb && git checkout d4472ec && stack build && echo && stack exec -- bank-acct-demo

Initialising State 'John Smith' 0 and creating DB in "/tmp/eventdb-bank-acct-demo"
Executing random commands concurrently...
Resulting state: State 'Jemima Schmidt' 5

Replaying events against State 'John Smith' 0
Deposited 2
Deposited 5
Withdrew 5
Withdrew 1
Renamed "Jemima Schmidt"
Deposited 4

Comparing states...
State 'Jemima Schmidt' 5 == State 'Jemima Schmidt' 5 ~ True

We can observe a few things from this output:

  1. the 10 commands are fired seemingly at random
  2. Not all of the commands succeed; recall that we throw away invalid commands
  3. the audit log (or event store) is replayed
  4. the resulting replayed state is identical to the original

We can keep running the program via repeated executions of stack exec -- bank-acct-demo, to observe that in every execution, regardless of the random commands selected, the bank account balance never goes below zero despite the clear race condition inherent in multiple Withdraws being executed.

Altering the program to increase the number of concurrent commands attempted may help to convince us of the soundness of the logic.

Closing

I feel that event-driven systems are quite interesting, and that event-sourcing as a tool is quite compelling in its simplicity. Characterising a system in this manner allows us to embrace any disjoint system’s inherent eventual-consistency in an honest way, and, once we’re used to developing a system with this style we can feel some safety in being forced to deal with the unfortunate situations arising in complex, loosely connected systems.

To me this paradigm feels similar to the benefit that I get from Haskell’s compiler forcing me to (if I were looking at it cynically) jump through hoops, that ultimately saves me from swathes of bugs I’d otherwise introduce and in the best case discover immediately at runtime, and in worse cases leave to fester for someone else to find. It’s abrasive in some ways to be forced to write a program in this style, but on balance I feel that the cost is worthwhile versus the benefits when applied to suitable contexts.