Integration Patterns for Distributed Architecture - Intro to dionysus-rb

Karol
December 18, 2023
10 min to read

In the previous blog post, I promised we would introduce something special this time. So here we go - meet almighty Dionysus, who knows how to make the most of Kafka.

Change Data Capture

Change Data Capture is a popular pattern for establishing communication between microservices - it allows to turn all inserts/updates/deletes for all rows in any table into individual events that other services could consume, which would not only provide a way to notify the other service about the change but also to transfer the data.


Thanks to tools like Debezium, this is relatively straightforward to implement if you use Kafka. However, this approach has one serious problem - coupling to the database schema of the upstream service.


Individual tables and their columns often don't reflect the domain correctly in the upstream service, especially for relational databases. And for downstream microservices, it would be even worse. Not only your domain model might be composed of multiple entities (think of Domain-Driven Design Aggregates), but some attributes' values might be a result of a computation depending on more than a single entity, or it might be desired to publish some entity/aggregate change if there is a change in some dependency. For example, you might want to publish an event that some Account got updated when the new Rental is created to propagate the change of a potential `rentals_count` attribute.


Such an approach is quite natural when building HTTP APIs as it's simple to expose resources that don't directly map to database schema. Yet, with the CDC, this might be challenging. A potential workaround would be creating dedicated database tables that would store the data in the expected format and refresh them based on dependencies in the domain (so updating `rentals_count` in an appropriate row for a given Account after a new Rental is created if considering the example above), which would be pretty similar to materialized views. Nevertheless, it's still more like a workaround to comply with some constraints - in that case, it would be CDC operating on database rows.


A more natural approach would be CDC on the domain-model level. Something that would be close to defining serializers for REST APIs.


Meet almighty Dionysus, who knows how to make the most of karafka to achieve the result.


Dionysus-rb


Dionysus is quite a complex gem with multiple features, and some of them could use a separate blog post, which is something that we are likely to publish in the near future. Yet, the gem's documentation would be your best friend for now. Keep in mind, though, that this has been a private gem for a long time, so at the time of writing this article, some parts of the documentation might not be super clear.


Let's now implement a simple producer and consumer to demonstrate the gem's capabilities. Before releasing anything to production, read all the docs first. The following example is supposed to show the simplest possible scenario only, which is far from something that would be production-grade.


Example App


Let's start with the producer.


Producer


First, generate a new application:


rails new dionysus_producer


and add dionysus-rb to the Gemfile:


gem "dionysus-rb"


Let's create the database as well:


rails db:migrate


And now, we can create a karafka.rb file with the following content:


Dionysus.initialize_application!(
  environment: ENV["RAILS_ENV"],
  seed_brokers: ["127.0.0.1:9092"],  # assuming that this is where the kafka is running
  client_id: "dionysus_producer",
  logger: Rails.logger
)


For a simple demo, let's assume that we will have a User model on both the producer and consumer side with a name attribute to keep things simple.


Let's generate the model:


rails generate model User name:string
rails db:migrate


And let's make this model publishable:


class User < ApplicationRecord
  include Dionysus::Producer::Outbox::ActiveRecordPublishable
end


We will also use a transactional outbox pattern to ensure maximum durability so that we don't lose messages. For the sake of optimization, we will also publish messages after the commit.


In the production setup, you should also run an outbox worker as a separate process so that it can pick up any messages that failed for some reason, but again, to keep things simple, we are not going to do this for this demonstration.


Let's generate the outbox model:


rails generate model DionysusOutbox


And use the following migration:


class CreateDionysusOutbox < ActiveRecord::Migration[7.0]
  def change
    create_table(:dionysus_outboxes) do |t|
      t.string "resource_class", null: false
      t.string "resource_id", null: false
      t.string "event_name", null: false
      t.string "topic", null: false
      t.string "partition_key"
      t.datetime "published_at"
      t.datetime "failed_at"
      t.datetime "retry_at"
      t.string "error_class"
      t.string "error_message"
      t.integer "attempts", null: false, default: 0
      t.datetime "created_at", precision: 6, null: false
      t.datetime "updated_at", precision: 6, null: false

      # some of these indexes are not needed, but they are here for convenience when checking stuff in console or when using a tartarus for archiving
      t.index ["topic", "created_at"], name: "index_dionysus_outboxes_publishing_idx", where: "published_at IS NULL"
      t.index ["resource_class", "event_name"], name: "index_dionysus_outboxes_on_resource_class_and_event"
      t.index ["resource_class", "resource_id"], name: "index_dionysus_outboxes_on_resource_class_and_resource_id"
      t.index ["topic"], name: "index_dionysus_outboxes_on_topic"
      t.index ["created_at"], name: "index_dionysus_outboxes_on_created_at"
      t.index ["resource_class", "created_at"], name: "index_dionysus_outboxes_on_resource_class_and_created_at"
      t.index ["resource_class", "published_at"], name: "index_dionysus_outboxes_on_resource_class_and_published_at"
      t.index ["published_at"], name: "index_dionysus_outboxes_on_published_at"
    end
  end
end


And run the migration:


rails db:migrate


And include the outbox module in the model:


class DionysusOutbox < ApplicationRecord
  include Dionysus::Producer::Outbox::Model
end


We can move on now to more Kafka-related things - topics. Or rather a single topic - to publish users. Let's wrap it in the dionysus_demo namespace so the actual Kafka topic name will be dionysus_demo_users.


We will also need to define two serializers:

  • the primary one that infers other serializers based on the model class (DionysusDemoSerializer)
  • the actual serializer for the model (UserSerializer)


Knowing all these things, let's create dionysus.rb initializer:


Rails.application.config.to_prepare do
  Karafka::App.setup do |config|
    config.producer = ::WaterDrop::Producer.new do |producer_config|
      producer_config.kafka = {
        'bootstrap. servers': 'localhost:9092', # this needs to be a comma-separated list of brokers
        'request.required.acks': 1,
        "client.id": "dionysus_producer"
      }
      producer_config.id = "dionysus_producer"
      producer_config.deliver = true
    end
  end

  Dionysus::Producer.configure do |config|
    config.database_connection_provider = ActiveRecord::Base
    config.transaction_provider = ActiveRecord::Base 
    config.outbox_model = DionysusOutbox 
    config.default_partition_key = :id # we don't care about the partition key at this time, but we need to provide something
    config.transactional_outbox_enabled = true
    config.publish_after_commit = true
  end

  Dionysus::Producer.declare do
    namespace :dionysus_demo do
      serializer DionysusDemoSerializer

      topic :users do
        publish User
      end
    end
  end
end


And let's create the serializers mentioned above:


class DionysusDemoSerializer < Dionysus::Producer::Serializer
  def infer_serializer
    "#{model_klass}Serializer".constantize
  end
end


The only method we care about at this stage is infer_serializer. The implementation will be pretty simple to infer the UserSerializer class from the User model.


And the second serializer:


class UserSerializer < Dionysus::Producer::ModelSerializer
  attributes :name, :id, :created_at, :updated_at
end


Now, let's run the Rails console and see how everything is working:


User.create!(name: "Dionysus")

DionysusOutbox.last

The outbox should look like this:


DionysusOutbox:0x0000000112e2b400
 id: 1,
 resource_class: "User",
 resource_id: "1",
 event_name: "user_created",
 topic: "dionysus_demo_users",
 partition_key: "[FILTERED]",
 published_at: Fri, 08 Dec 2023 13:59:45.541653000 UTC +00:00,
 failed_at: nil,
 retry_at: nil,
 error_class: nil,
 error_message: nil,
 attempts: 0,
 created_at: Fri, 08 Dec 2023 13:59:45.481140000 UTC +00:00,
 updated_at: Fri, 08 Dec 2023 13:59:45.481140000 UTC +00:00
 


Having some timestamp in published_at means the record was published successfully to Kafka. So we are done as far as the producer goes!


Let's add a consumer that will be able to consume these messages.


Consumer


First, generate a new application:


rails new dionysus_producer


and add dionysus-rb to the Gemfile:


gem "dionysus-rb"


Let's create the database as well:


bundle exec rake db:migrate


And now, we can create a karafka.rb file with the following content:


Dionysus.initialize_application!(
  environment: ENV["RAILS_ENV"],
  seed_brokers: ["127.0.0.1:9092"],  # assuming that this is where the kafka is running
  client_id: "dionysus_producer",
  logger: Rails.logger
)


As the consumer is going to consume events related to the User, let's create a model for it:


rails generate model User name:string synced_id:bigint synced_created_at:datetime synced_updated_at:datetime synced_data:jsonb


synced_id is the reference to the primary key on the producer side, and synced_created_at/synced_updated_at are timestamps from the producer, and synced_data is a JSON containing all the attributes that were published.


Let's run the migration:


rails db:migrate


We will need to do two more things:

  • declare which topic we want to consume from - we need topic `users` under the `dionysus_demo` namespace
  • infer the User model for User-related models - we will do this via `model_factory`


Let's create the dionysus.rb initializer:


Rails.application.config.to_prepare do
  Dionysus::Consumer.declare do
    namespace :dionysus_demo do
      topic :users
    end

    Dionysus::Consumer.configure do |config|
      config.transaction_provider = ActiveRecord::Base
      config.model_factory = DionysusModelFactory
    end
  end

  Dionysus.initialize_application!(
    environment: ENV["RAILS_ENV"],
    seed_brokers: ["127.0.0.1:9092"],
    client_id: "dionysus_consumer",
    logger: Rails.logger
  )
end


And define the DionysusModelFactory:


class DionysusModelFactory
  def self.for_model(model_name) 
    model_name.classify.constantize rescue nil 
  end
end


So, from the "User" string, we will infer the User class.


We can now run the karafka server:


bundle exec karafka server


And let's check the end result in the console:


User.last


That should give us a similar result to this:


User:0x0000000110a420e8
 id: 1,
 name: "Dionysus",
 synced_id: 1,
 synced_created_at: Fri, 08 Dec 2023 14:02:36.280000000 UTC +00:00,
 synced_updated_at: Fri, 08 Dec 2023 14:02:36.280000000 UTC +00:00,
 synced_data: {"name"=>"Dionysus", "synced_id"=>8, "synced_created_at"=>"2023-12-08T14:02:36.280Z", "synced_updated_at"=>"2023-12-08T14:02:36.280Z"},
 created_at: Fri, 08 Dec 2023 14:02:42.171312000 UTC +00:00,
 updated_at: Fri, 08 Dec 2023 14:02:42.171312000 UTC +00:00



It's that simple to use Dionysus and implement CDC on the domain model level!


Conclusions


This blog post introduced dionysus-rb - a robust framework built on top of karafka, allowing CDC (Change Data Capture)/logical replication on the domain model level. This time, it covered only a tiny portion of what Dionysus is capable of, so stay tuned for the upcoming blog posts.

Karol
December 18, 2023
10 min to read