Coil concourse.odeko.com

Transactional inbox/outbox message queuing.

This Rails engine can be mounted in any PostgreSQL-backed Rails app.

Motivation

The motivating use-case involves event-streaming with Kafka.

Kafka guarantees that messages on the same topic and partition will be read in the same order as written, but we need to ensure we're writing them in the correct order, regardless of any concurrent processes we may be running.

Similarly, once we've read a message from the stream, we'd like to hand off the message processing to an asynchronous job while ensuring we process messages of a given type and key in the same order as read.

Implementation

The inbox/outbox pattern (see also article) ensures message delivery.

Message ordering is preserved using advisory locks as a synchronization mechanism.

Installation

Add to the application's Gemfile:

gem "coil"
gem "schema_version_cache"

Install engine and migrations:

$ bundle
$ bundle exec rails coil:install:migrations
$ bundle exec rails db:migrate

Register periodic jobs:

# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
  # ...
  config.periodic do |mgr|
    mgr.register("*/10 * * * *", "Coil::Inbox::MessagesPeriodicJob")
    mgr.register("5-59/10 * * * *", "Coil::Outbox::MessagesPeriodicJob")
  end
end

Filter retryable errors out of alerting, e.g. airbrake:

# config/initializers/airbrake.rb
Airbrake.add_filter do |notice|
  exception = notice.stash[:exception]
  notice.ignore! if exception.is_a?(Coil::TransactionalMessagesJob::RetryableError)
end

Set up schema version cache as described here

Usage: inbox

Define a message type and corresponding job:

# app/models/inbox/foo_message.rb
class Inbox::FooMessage < Coil::Inbox::Message
  def job_class
    Inbox::FooMessagesJob
  end
end
# app/jobs/inbox/foo_messages_job.rb
class Inbox::FooMessagesJob < Coil::TransactionalMessagesJob
  private

  # Put message processing logic in this method.
  def process(message)
    # For example...
    uuid = message.key
    val = message.value.deep_symbolize_keys
    Foo.do_stuff(uuid:, potato: val[:potato])
  end

  def message_class
    Inbox::FooMessage
  end
end

(The test-suite contains a working example with type-annotations: message, job)

(For advanced use-cases, you can also define an around_process job method. See example)

Receive messages from Kafka:

# app/consumers/my_consumer.rb
class MyConsumer < Racecar::Consumer
  FOO = "com.example.service.foo".freeze
  subscribes_to FOO

  def process(message)
    key = AvroMessaging.decode(message.key)
    decoded = AvroMessaging.decode_message(message.value)
    value = decoded.message.deep_symbolize_keys
    schema_id = decoded.schema_id

    case message.topic
    when FOO
      Receivers::FooReceiver.receive(key:, value:, schema_id:)
    end
  end
end
# app/lib/receivers/foo_receiver.rb
module Receivers::FooReceiver
  VALUE_SCHEMA_SUBJECT = "com.example.service.Foo_value"

  def self.receive(key:, value:, schema_id:)
    schema_version = AvroVersionCache.get_version_number(
      subject: VALUE_SCHEMA_SUBJECT,
      schema_id:
    )
    Inbox::FooMessage.create!(
      key:,
      value:,
      metadata: {
        value_schema_subject: VALUE_SCHEMA_SUBJECT,
        value_schema_version: schema_version,
        value_schema_id: schema_id
      }
    )
  end
end

Usage: outbox

Define a message type and corresponding job:

# app/models/outbox/bar_message.rb
class Outbox::BarMessage < Coil::Outbox::Message
  VALUE_SCHEMA_SUBJECT = "com.example.Bar_value"

  def job_class
    Outbox::BarMessagesJob
  end
end
# app/jobs/outbox/bar_messages_job.rb
class Outbox::BarMessagesJob < Coil::TransactionalMessagesJob
  private

  # Attach schema metadata to message
  def pre_process(message)
    value_schema_subject = Outbox::BarMessage::VALUE_SCHEMA_SUBJECT
    value_schema_id = AvroVersionCache.get_current_id(subject: value_schema_subject)
    value_schema_version = AvroVersionCache.get_version_number(
      subject: value_schema_subject,
      schema_id: value_schema_id
    )
     = {
      value_schema_subject:,
      value_schema_id:,
      value_schema_version:
    }.merge(message.)

    message.update!(metadata:)
  end

  # Write message to Kafka
  def process(message)
    BarEvent.new(message).produce_async
  end

  def message_class
    Outbox::BarMessage
  end
end

(The test-suite contains a working example with type-annotations: message, job)

Write to the outbox:

bar = Bar.first
turnips = bar.count_turnips
Outbox::BarMessage.create!(key: bar.uuid, value: {turnips:})

Usage: queue locking

The inbox and outbox operations described above automatically preserve message ordering by sequentializing the creation and processing of messages with a given type and key.

You can access these synchronization mechanisms directly if necessary:

# If we want to treat turnip-harvesting and message creation as one operation
# and ensure that concurrent attempts to run that operation on the same Bar will
# be run sequentially:
Outbox::BarMessage.locking_persistence_queue(keys: [bar.uuid]) do
  bar.harvest_turnips
  bar.replant
  turnips = bar.count_turnips
  Outbox::BarMessage.create!(key: bar.uuid, value: {turnips:})
end

# More generally, we can run an action while holding advisory locks on a list of
# keys in some arbitrary keyspace:
queue_type = "FOOD_PREP"
message_type = "SOUP"
ingredients = ["lentils", "tomato"]
Coil::QueueLocking.locking(queue_type:, message_type:, message_keys: ingredients) do
  Chef.make_soup(ingredients)
end

# The `locking` call above will wait until it's able to obtain the requested
# lock. If we'd rather abort the operation than wait for the lock:
Coil::QueueLocking.locking(queue_type:, message_type:, message_keys: ingredients, wait: false) do
  Chef.make_soup(ingredients)
rescue Coil::QueueLocking::LockWaitTimeout
  puts("Looks like someone else is already on it")
end

Configuration

To adjust the configurable settings used within your application, create an initializer at config/initializers/coil.rb with the following content, then uncomment and adjust the settings you wish to change:

# Coil.sidekiq_queue = "default"

Development

Install development dependencies:

$ bundle

Install pre-commit hook:

$ bin/install-pre-commit

Setup database:

$ bin/rails db:setup
$ bin/rails db:migrate

Run test-suite:

$ bin/ci-test

Run linter:

$ bin/lint

Regenerate type info for DSLs (e.g. after adding a db migration):

$ bin/tapioca dsl --app-root=spec/dummy

Regenerate type info for gems (e.g. after adding a gem):

$ bin/tapioca gem

Coil's type annotations are declared in rbi/coil.rbi to facilitate typechecking by Rails apps that use this engine along with Sorbet. Keeping these annotations in a separate file avoids foisting a Sorbet runtime dependency on any app that uses our engine.