Event Sourcing with PureLogic
I recently wrote about PureLogic, a Scala library for writing direct-style, pure domain logic. If you haven’t read that post yet, I recommend starting there first. In this post, we’ll use PureLogic to model an event-sourced domain in a way that guarantees that state changes and emitted events always stay in sync.
What is Event Sourcing?
Event sourcing is a pattern where, instead of storing only the current state of your data, you store the sequence of events that led to that state. For a bank account, instead of storing “balance = 220”, you store “deposited 100, deposited 150, withdrew 30”. The current state can always be rebuilt by replaying those events from the beginning, or from a more recent snapshot. This gives you a complete audit trail, the ability to time-travel to past states, and a natural fit with event-driven architectures.
When using event sourcing, it’s generally a good idea to isolate the pure domain logic from the rest of the application. When replaying events to rebuild your state, you obviously do not want to re-run side effects. But the same argument applies when generating new events: if part of the workflow fails halfway through, you don’t want to compensate for already-triggered effects. The goal is to use pure logic to decide which events should happen, and only then perform side effects at the boundary to persist or react to them.
Scala gives us great tools to make side effects explicit, whether you use ZIO, Cats Effect, Kyo, or Yaes. Here we’ll focus on a narrower question: how can we write useful domain logic without introducing those effects into the core at all?
Constraints & domain model
There are a few constraints we want the model to enforce:
- We should not be able to modify the state without generating an event.
- We should not be able to generate an event without modifying the state.
- The way a given event modifies the state should be deterministic, defined in a single place and used both when generating and replaying events.
We’ll use a bank account example to illustrate the idea. Let’s start with the domain model and events. We will use Neotype to define an Amount type that can not be negative.
import neotype.*
import scala.math.Ordering.Implicits._
type Amount = Amount.Type
object Amount extends Newtype[Int] {
override inline def validate(value: Int) = value >= 0
extension (amount: Amount) {
def +(other: Amount): Amount =
Amount.unsafeMake(amount.unwrap + other.unwrap)
def -(other: Amount): Option[Amount] =
Amount.make(amount.unwrap - other.unwrap).toOption
}
given Ordering[Amount] = Ordering.by[Amount, Int](_.unwrap)
}
case class Account(balance: Amount)
enum AccountEvent {
case Deposit(amount: Amount)
case Withdraw(amount: Amount)
}
case class Config(maxDeposit: Amount, maxWithdrawal: Amount)
Transitions
Now let’s model how events modify the state. A Transition is a function that, given a starting state and an event, returns a new state. It can also fail with an error if the event is not valid for the given state.
Since we don’t want side effects in this part of the code, we won’t use any of the effect libraries mentioned earlier. Instead, we will use PureLogic, which restricts what we can do to only a handful of capabilities. For Transition, we can make use of the State and Abort capabilities: State lets us access the current state and modify it, and Abort lets us fail the computation with an error.
import purelogic.*
trait Transition[Ev, S, Err] {
def run(ev: Ev): (State[S], Abort[Err]) ?=> Unit
}
This means that when implementing run, we are only allowed to use operations that require the State and Abort capabilities, such as get, set, ensure, or fail. We do not have access to anything else.
Let’s look at the transition for AccountEvent. When processing Withdraw, we might not be able to create a new Amount if the result would be negative. Ideally this is prevented before the event is generated, but the transition still needs to handle that case.
import purelogic.syntax.* // enables .orFail on an Option
given Transition[AccountEvent, Account, String] with {
def run(ev: AccountEvent): (State[Account], Abort[String]) ?=> Unit =
ev match {
case AccountEvent.Deposit(amount) =>
val newBalance = get.balance + amount
set(Account(newBalance))
case AccountEvent.Withdraw(amount) =>
val newBalance = (get.balance - amount).orFail("Insufficient balance")
set(Account(newBalance))
}
}
This transition will be reused in two places: when generating a new event and when replaying historical ones. That guarantees the two code paths stay consistent.
You might wonder: why not ignore PureLogic entirely and use:
trait Transition[Ev, S, Err] {
def run(ev: Ev, state: S): Either[Err, S]
}
It would definitely make our example above simpler. However, once the logic gets a little more complex and consists of multiple functions to call, you’ll realize that combining Either requires chaining flatMap calls, directly or via for-comprehensions. It also requires passing the current state around as an explicit parameter, which creates more room for mistakes. The syntax quickly becomes more verbose and nested, while also causing more allocations. The benefit of PureLogic’s approach becomes even more obvious when adding Reader and Writer capabilities.
A new capability
Now let’s think about the rest of the domain logic. We will use the Reader capability to access the configuration when we need it, and the Writer capability to accumulate the events. We’ll also need State and Abort when calling a Transition#run function.
However, if you remember our constraints, we should not expose the raw State and Writer capabilities directly. If we did, our domain logic code could mutate the state or append events independently, which would break the guarantee that the two always move together.
Instead, we will create a new capability EventSourcing[Ev, S, Err] that exposes the two functions we need: writeEvent and replayEvents. It should look something like this:
trait EventSourcing[Ev, S, Err] {
def writeEvent(event: Ev)(using Transition[Ev, S, Err]): Unit
def replayEvents(events: Iterable[Ev])(using Transition[Ev, S, Err]): Unit
}
writeEvent takes an event and a Transition. It runs the transition, which updates the state, and then records the event.
replayEvents takes a sequence of already-persisted events and applies the same transition to rebuild the state, without recording them again.
To modify the state and write the event, we need the State and Writer capabilities. But since we don’t want the end-user to have access to them directly, we will make them protected inside EventSourcing. The real implementation will be as follows:
trait EventSourcing[Ev, S, Err] {
protected given state: State[S]
protected given writer: Writer[Ev]
def writeEvent(event: Ev)(
using transition: Transition[Ev, S, Err], abort: Abort[Err]
): Unit = {
transition.run(event)
write(event)
}
def replayEvents(events: Iterable[Ev])(
using transition: Transition[Ev, S, Err], abort: Abort[Err]
): Unit =
events.foreach(transition.run)
}
transition.run uses the given State in scope, while write uses the given Writer. There is no Abort available inside EventSourcing, so we add it as a parameter and let failures bubble up to the caller.
We also need a way to provide this capability when we are ready to run a program that needs it.
object EventSourcing {
def apply[Ev, S, Err, A](body: EventSourcing[Ev, S, Err] ?=> A)(
using s: State[S], w: Writer[Ev]
): A = {
val eventSourcing = new EventSourcing[Ev, S, Err] {
protected given state: State[S] = s
protected given writer: Writer[Ev] = w
}
body(using eventSourcing)
}
}
This will transform a program that needs the EventSourcing capability into a program that needs the State and Writer capabilities. This is intended to be used at the top level of the application, so it’s okay to pass the State and Writer capabilities as parameters.
Since writeEvent and replayEvents are methods on the EventSourcing trait, we also define top-level helpers so they can be called as free functions in our domain logic:
inline def writeEvent[Ev, S, Err](event: Ev)(using
transition: Transition[Ev, S, Err],
es: EventSourcing[Ev, S, Err],
abort: Abort[Err]
) = es.writeEvent(event)
inline def replayEvents[Ev, S, Err](events: Iterable[Ev])(using
transition: Transition[Ev, S, Err],
es: EventSourcing[Ev, S, Err],
abort: Abort[Err]
): Unit = es.replayEvents(events)
Domain logic
Now, with all this machinery in place, we can write domain logic using this new capability. To keep signatures compact, let’s define a type alias for the capabilities our domain code is allowed to use.
type Program[A] =
(
Reader[Config],
StateReader[Account],
Abort[String],
EventSourcing[AccountEvent, Account, String]
) ?=> A
Any Program can only do the following:
- read the configuration
Config - read the current state
Account(but not modify it) - fail the computation with an error of type
String - write an
AccountEvent, which will also modify the state via a givenTransition
Anything else is forbidden by the compiler. In particular, we can not call update, set, or write directly.
Now let’s write functions to deposit and withdraw.
def deposit(amount: Amount): Program[Unit] = {
ensure(amount <= read.maxDeposit, "Amount exceeds maximum deposit")
writeEvent(AccountEvent.Deposit(amount))
}
def withdraw(amount: Amount): Program[Unit] = {
ensure(amount <= read.maxWithdrawal, "Amount exceeds maximum withdrawal")
ensure(amount <= get.balance, "Insufficient balance")
writeEvent(AccountEvent.Withdraw(amount))
}
The code stays small and direct. We use ensure for validation (a function that calls fail if the predicate is false), then writeEvent to emit the event and apply the transition in one step.
Now how do we run a Program? We can use the EventSourcing.apply method we just created.
def runProgram[A](account: Account, config: Config)(
program: Program[A]
): Either[String, (Vector[AccountEvent], Account, A)] =
Abort { // eliminates Abort
val (events, (newState, a)) =
Reader(config) { // eliminates Reader
Writer { // eliminates Writer
State(account) { // eliminates State
EventSourcing { // eliminates EventSourcing, adds State and Writer
program // requires EventSourcing, StateReader, Reader, Abort
}
}
}
}
(events, newState, a)
}
We peel off each capability layer one by one: Abort handles errors, Reader provides configuration, Writer collects new events, State manages the account, and EventSourcing ties state and writer together behind a safe interface. The nesting order determines the shape of the final return type.
Let’s test it.
runProgram(Account(Amount(100)), Config(Amount(1000), Amount(100))) {
deposit(Amount(50))
withdraw(Amount(30))
deposit(Amount(100))
}
// Right((Vector(Deposit(50), Withdraw(30), Deposit(100)), Account(220), ()))
It works! We get back the generated events and the resulting state, and the two are always consistent: the state is exactly what you get by applying those events. We can now persist the events to our event store and optionally save a snapshot.
Now let’s try to deposit more than the maximum deposit amount.
runProgram(Account(Amount(100)), Config(Amount(1000), Amount(100))) {
deposit(Amount(2000))
}
// Left("Amount exceeds maximum deposit")
We get a failure as expected. In that case, we persist nothing and return the error to the caller.
How about side effects?
So far, everything we’ve written is pure. At some point, though, we need to interact with the real world: load state from a database, persist new events, and maybe publish something to Kafka. This is where the imperative shell comes in.
First, we need a way to obtain the current account state. If it is not already in memory, we can rebuild it from the database by loading the latest snapshot and replaying the events that happened since then. The following code uses an imaginary MyEffect object that can be replaced with any real effect system.
def loadAccount(userId: UserId) =
for {
snapshot <- Database.getLatestSnapshot(userId)
journals <- Database.getJournals(userId, since = snapshot.version)
account <- MyEffect.fromEither(
runProgram(snapshot.state, config) {
replayEvents(journals)
}.map(_._2)
)
} yield account
Then processing a command becomes straightforward. We run the corresponding program on the current state, persist the new events, and optionally publish them:
def process(userId: UserId, account: Account, program: Program[Unit]) =
for {
(events, _, _) <- MyEffect.fromEither(
runProgram(account, config)(program)
)
_ <- Database.appendJournals(userId, events)
_ <- Kafka.publish("account-events", userId, events)
} yield ()
runProgram is just a regular function call that returns an Either. It does not need a special runtime or interpreter. The pure domain logic executes immediately, gives us back the new events and the updated state, and then we use our effect system at the boundary to persist or publish them. Snapshots can be saved periodically as an optimization, but they are not required on every command.
Conclusion
By leveraging PureLogic’s capability system, we built an event sourcing abstraction that enforces the important constraints at the type level. The compiler ensures that state can only change through events, that emitted events always correspond to state transitions, and that the same transition logic is shared between command handling and replay.
The EventSourcing capability acts as a gatekeeper: it hides State and Writer behind a controlled interface, exposing only writeEvent and replayEvents. Combined with StateReader for read-only access to the current state, the domain logic has exactly the power it needs and nothing more.
And because everything runs as plain Scala functions with no monadic runtime in the middle, you get fast execution, meaningful stack traces, and easy integration with any effect system at the boundary. The full code for this example is available in this Gist and in Scastie.
Note that I plan to add Transition and EventSourcing to PureLogic itself in the near future. That means this whole part will come for free, and you will be able to write the domain logic directly without adding any additional machinery. But this post serves as an example of how you can build custom capabilities on top of PureLogic to enforce your own constraints.