Event Sourcing with Rails from scratch

Oleg
March 22, 2024
10 min to read

In the previous article Introduction to Event Sourcing and CQRS we got familiar with the main concepts of Event Sourcing and reviewed the cons and pros of this approach.

Implementing Event Sourcing in Rails can be a powerful way to handle complex business logic and maintain a reliable audit trail of changes to your application's state. Event Sourcing involves storing a sequence of events that represent changes to the state of your application over time.

Now, before we plummet into using Event Sourcing in Rails with help or battle-tested solutions, let's research the basics and learn how to implement things from scratch. However, be aware - you should think twice before using custom implementation (too many chances you going to miss some small detail and the consequences may be huge). For these purposes let's create a simple application for listing rental advertisements on the website.

The process may be quite complicated, but for the sake of simplicity, let's assume it goes as below:

  • the ad listing is created
  • the content is updated
  • the listing goes published on the website
  • ad listing removed

Preparations

where we start building a path in our journey and getting familiar with events

So, let's create a new project for our implementation purposes first:


rails new event_sourced_ads --database=postgresql --skip-test

Skipping tests, since I’m fan so we'll use that:


gem "rspec-rails", "~> 6.1.0"

Then bundle and rails generate rspec:install

Now, once the initial setup is done we are going to use events. Let’s implement that part. And guess what we’re starting with:


class CreateEvents < ActiveRecord::Migration[7.1]
  def change
    create_table :events, id: :uuid do |t|
      t.string :stream_name
      t.string :event_type
      t.jsonb :data

      t.timestamps
    end
  end
end


We use UUID here for primary keys. When dealing with distributed systems and the need for worldwide uniqueness, opting for a UUID could be the optimal decision.

We need streams to separate events related to certain entities. Streams are needed to group events of a particular kind. In our case, I’m going to group events related to one single ad, so those can be easily fetched. Frankly speaking, it’s not a good idea to store stream names like that, since the events may be in different streams. For the sake of simplicity, let’s consider doing some evil (we’ll do more till the end).

We need some basic event class that we can publish and verify input with, also we need some way to use pub-sub (quite a crucial part). I’m excited about dry-rb stack- I can’t say it’s perfect, but it usually perfectly suits all my needs, so I'm turning on the imagination and seeing what it brings…

ImaginationCompleted.publish(data: {idea: "Create BaseEvent", pub_sub: "KISS rails has one built-in"})

Who am I to argue with that 😇 Let’s start with a spec:


it "persists an event record" do
  expect { publish }.to change { Event.count }.by(1)
  expect(Event.last).to have_attributes(
    event_type: "FakeEvent",
    data: {"name" => "whatever"},
    stream_name: "123123"
  )
end

it "sends a notification" do
  allow(ActiveSupport::Notifications).to receive(:instrument)
  publish
  expect(ActiveSupport::Notifications).to have_received(:instrument).with(
    "FakeEvent", data: {name: "whatever"}, stream_name: "123123"
  )
end


and after some struggle, we come up with:


# lib/events/base_event.rb
module Events
  class BaseEvent
    class InvalidAttributes < StandardError; end

    class MissingContract < StandardError; end

    attr_reader :data

    def self.schema(&block)
      inner_schema = block.call
      define_method(:params_schema) do
        Dry::Schema.Params do
          required(:data).hash(inner_schema)
        end
      end
    end

    def self.publish(**args)
      new(**args.slice(:data)).publish(stream_name: args[:stream_name])
    end


    def initialize(**args)
      validate_input(args)
      @data = args[:data]
    end

    def publish(stream_name: nil)
      Event.create!(
        event_type: self.class.name, data:, stream_name:
      )
      ActiveSupport::Notifications.instrument(self.class.name, data:, stream_name:)
      self
    end

    def params_schema
      ->(_) { raise MissingContract, "Contract needs to be implemented" }
    end

    def validate_input(args)
      data_validation = params_schema.call(args)
      raise InvalidAttributes.new(data_validation.errors.to_h) if data_validation.errors.any?
    end
  end
end


And let’s try using our new pet. You know where to start…


RSpec.describe Events::AdCreated do
  describe ".publish" do
    subject(:publish) do
      described_class.publish(
        data: {title: "Some title", body: "Some description"},
        stream_name: "123456789",
      )
    end

    it "persists the event in database" do
      expect { publish }.to change { Event.count }.by(1)
      expect(Event.last).to have_attributes(
        event_type: "Events::AdCreated",
        data: {
          "title" => "Some title",
          "body" => "Some description"
        },
        stream_name: "123456789"
      )
    end
  end
end 

and the event itself:


# lib/events/ad_created.rb
class Events::AdCreated < Events::BaseEvent
  schema do
    Dry::Schema.Params do
      required(:title).filled(:string)
      required(:body).filled(:string)
    end
  end
end


Aggregate part

the one where we learn to manipulate our ads

So, as user we should be able to create new ad, possibly modify that and publish. However, we shouldn’t be able to edit already published ad. So we need some consistency in actions and having corresponding event published after the action is executed. That’s where the aggregate comes to place.

So, we create AdAggregate class and start with test for the new instance:


# spec/services/ad_aggregate_spec.rb
RSpec.describe AdAggregate do
  it "has valid attributes on initialization" do
    expect(aggregate).to have_attributes(
      id: kind_of(String),
      state: :new
    )
  end
# app/services/ad_aggregate.rb
class AdAggregate
  attr_reader :id, :attributes, :state

  def initialize(id = nil)
    @id = id || SecureRandom.uuid
    @state = :new
  end
end

Next we need possibility to actually create new draft and have those attributes in the aggregate. Also we need to publish an event that the draft is created.


describe "#create_draft" do
  subject(:create_draft) { aggregate.create_draft(**attributes) }

  context "with valid attributes" do
    let(:attributes) { valid_attributes }

    it "updates attributes and state" do
      create_draft
      expect(aggregate).to have_attributes(
        attributes: {
          title: "Test title",
          body: "Test description"
        },
        state: :draft
      )
    end
  end
end


The one is easy to implement, but we face a problem here. We should be able to restore the state of the aggregate later when we want to apply next actions to that. The aggregate is supposed to be event sourced one. So we need a way to apply events to that and all we should actually do here is to apply an event


def create_draft(title:, body:)
  apply Events::AdCreated.new(data: {ad_id: id, title:, body:})
end


We need a handler in the aggregate to understand how we modify the attributes, how the state is changed and how we can restore the state of an aggregate from history of events (for this purpose we’ll create another class in a while 😉). Also, the events should be published when we store the aggregate. For these purposes, let’s add handler methods to explain how we want to modify aggregate’s state on event and common method that will also create a queue of unpublished events


def unpublished_events
  @unpublished_events ||= []
end

def apply_event(event)
  send("apply_#{event.class.name.demodulize.underscore}", event)
end

private 

def apply(event)
  unpublished_events << event
  apply_event(event)
end

def apply_ad_created(event)
  @state = :draft
  @attributes = event.data.slice(:title, :body)
end


So, when the new event is applied we save that in a queue of unpublished events and call the corresponding handler. But what’s the sense of that without having the events stored? How to fetch previously created aggregate? We could implement that here in this class, though according Single Responsibility Principle, it’s definitely a work that someone else should do. That’s where we need a repository:


# frozen_string_literal: true

require "rails_helper"

RSpec.describe Repository do
  describe '.load' do
    subject(:load) { described_class.load(aggregate_class, stream_name) }

    let(:aggregate_class) { AdAggregate }
    let(:stream_name) { SecureRandom.uuid }

    context "without events" do
      it "loads new aggregate" do
        expect(load).to be_instance_of(aggregate_class).and have_attributes(
          id: stream_name,
          state: :new
        )
      end
    end

    context "with existing events" do
      context "when applying AdCreated event" do
        before do
          Event.create(
            event_type: "Events::AdCreated", stream_name:,
            data: {ad_id: stream_name, title: "title", body: "body"}
          )
        end

        it "applies event to aggregate" do
          expect(load).to be_a(AdAggregate).and have_attributes(
            id: stream_name,
            state: :draft,
            attributes: {
              title: "title",
              body: "body"
            }
          )
        end

        context "when applying AdPublished" do
          before do
            Event.create(
              event_type: "Events::AdPublished", stream_name:,
              data: {ad_id: stream_name, remote_id: "xosfjoj"}
            )
          end

          it "applies event to aggregate" do
            expect(load).to be_a(AdAggregate).and have_attributes(
              id: stream_name,
              state: :published,
              attributes: {
                title: "title",
                body: "body"
              }
            )
          end
        end
      end
    end
  end

  describe '.store' do
    subject(:store) { described_class.store(aggregate) }

    context "with unpublished events" do
      let(:aggregate) do
        instance_double(AdAggregate, id: stream_name, unpublished_events: [event])
      end
      let(:stream_name) { SecureRandom.uuid }
      let(:event) do
        Events::AdCreated.new(data: {ad_id: stream_name, title: "title", body: "body"})
      end

      it "publishes pending events" do
        expect { store }.to change { Event.count }.by(1)
        expect(Event.last).to have_attributes(
          stream_name:,
          event_type: "Events::AdCreated",
          data: {
            "ad_id" => stream_name,
            "title" => "title",
            "body" => "body"
          }
        )
      end
    end
  end
end


and the implementation of that is easy enough. I’ll omit description of that to save some precious space and time


module Repository
  extend self

  def load(aggregate_class, stream_name)
    events = Event.where(stream_name:).map do |event|
      event.event_type.constantize.new(data: event.data)
    end
    aggregate_class.new(stream_name).tap do |aggregate|
      events.each do |event|
        aggregate.apply_event(event)
      end
    end
  end

  def store(aggregate)
    aggregate.unpublished_events.each do |event|
      event.publish(stream_name: aggregate.id)
    end
  end
end

We do have a possibility to store the aggregate to load that from existing events. However, we are missing one of the main purposes for the aggregate. We should disallow editing already published ads, also we definitely can’t publish the same ad twice (well technically we can, but for sure that’s wrong). So, as usually:


describe "#update_content" do
  subject(:update_content) { aggregate.update_content(**new_attributes) }

  let(:aggregate) { described_class.new }
  let(:new_attributes) do
    {title: "Updated title", body: "Updated description"}
  end

  context "when ad is in draft state" do
    before { aggregate.create_draft(**valid_attributes) }

    it "updates ad attributes" do
      update_content
      expect(aggregate).to have_attributes(
        attributes: {
          title: "Updated title",
          body: "Updated description"
        },
        state: :draft
      )
    end
  end

  context "when ad is in published state" do
    before do
      aggregate.create_draft(**valid_attributes)
      aggregate.publish
    end

    it "raises an error" do
      expect { update_content }.to raise_error(described_class::AlreadyPublished)
    end
  end
end

describe "#publish" do
  subject(:publish) { aggregate.publish }

  let(:aggregate) { described_class.new }

  context "when ad is in draft state" do
    before { aggregate.create_draft(**valid_attributes) }

    it "updates state to published" do
      publish
      expect(aggregate.state).to eq(:published)
    end
  end

  context "when ad is in published state" do
    before do
      aggregate.create_draft(**valid_attributes)
      aggregate.publish
    end

    it "raises an error" do
      expect { publish }.to raise_error(described_class::AlreadyPublished)
    end
  end
end 


You can check the implementation of the methods in the repository and it’d be a good idea to try implementing that by yourself 😉

CQRS part

the one where we get familiar with read models and presentation to users

Ok, pub is ready, now it’s time to have sub part:


# config/initializers/event_listeners.rb
Rails.application.config.after_initialize do
  {
    AdEventListener: [
      Events::AdCreated,
    ]
  }.each do |listener, events|
    events.each { |event| ActiveSupport::Notifications.subscribe(event.to_s, listener.to_s.constantize) }
  end
end


So, here we are going to rule where events happen to be. In the example, AdEventListener will get the ActiveSupport event we broadcast with BaseEvent and send a call to our listener. Perfect… but not exactly what we need.


class ApplicationEventListener
  def self.call(event)
    public_send(
      "apply_#{event.name.demodulize.underscore}", **event.payload
    )
  end
end


and now we should be able to create listeners in a very convenient form:


class AdEventListener < ApplicationEventListener
  class << self
    def apply_ad_created(data:, stream_name:)
      Ad.create!(id: stream_name, **data)
    end
  end
end


🤔 …but stop, something’s wrong here. What’s Ad.create!? We don’t have that implemented… The part is omitted for a reason.

What we implemented above is a CQRS system and Ad is a read model. The structure of that is not important and should suit your needs. In this example project I’ve implemented Events::AdModified, Events::AdPublished, Events::AdRemoved. You can get familiar with the project.

Retrospective part

the one where we look over what we did

We’ve just implemented an application using Event Sourcing from scratch. I definitely would recommend to stay away from self-made solutions in production. Several simplifications were made (but you may need those once your project grows). Anyway, it’s good to know what‘s inside the black-box (gem) you use.

In the upcoming articles we’re going to play with some recognized instruments to implement event sourced applications

Oleg
March 22, 2024
10 min to read