nagare - A publish/subscribe library backed by Redis Streams

Nagare (flow in japanese) makes it easy to work with Redis Streams events in Ruby and Rails. This enables publish/subscribe patterns in your applications and can be handy to enable event-driven architectures. It may also assist in the decomposition and decoupling of Rails monoliths into microservices.

Guarantees & Behaviour

Nagare guarantees through the use of Redis Streams Groups exactly-once delivery of messages to listeners. Nagare is infinitely horizontally scalable, adding new servers running Nagare will add more consumers to the listener group in redis.

By hooking into ActiveRecord transactions, Nagare automatically ACK's messages only on successful transactions, and automatically retries failed ones according to configuration.

Nagare ensures that if a listener is removed or dies, messages are redistributed to other listeners as soon as they become available, based on a timeout. For more information on how this works see Recovering from permanent failures

Configuration

Add this line to your application's Gemfile:

gem 'nagare-redis'

And then execute:

$ bundle install

Or install it yourself as:

$ gem install nagare

To use with rails, add nagare to the initializers:

config/initializers/nagare.rb

Nagare.configure do |config|
  # After x milisseconds a pending message is considered failed and
  # gets retried by a different consumer in the group. Configuring
  # it too low might cause double processing of messages as a consumer
  # "steals" the load of another while the first one is still processing
  # it and hasn't had the chance to ACK, configuring it too high will 
  # introduce latency in your processing.
  # Default: 600.000 (10 minutes)
  config.min_idle_time = 600_0000

  # This is the consumer group name that will be used or created in
  # Redis. Use a different group for every microservice / application
  # Default: Rails.env
  config.group_name = :monolith

  # A suffix is supported in order to separate different environments
  # within the same redis database. Useful for development/test. This
  # gets added automatically to stream names and consumer group names.
  config.suffix = ''

  # URL to connect to redis. Defaults to redis://localhost:6379 uses 
  # ENV['REDIS_URL'] if present.
  config.redis_url = 'redis://10.1.1.1:6379'

  # Nagare uses ruby's threading model to run listeners in parallel 
  # and in the background
  # Default: 3 threads
  config.threads = 3

  # Nagare can execute a proc for error handling. This enables you to
  # use APM tools like New Relic or Appsignal with it.
  # The proc takes 2 parameters, message and error.
  # By default, nagare logs the error to stderr.
  config.error_handler = proc do |message, error|
    Appsignal.set_error(error);
  end

  # After exceeding the maximum number of retries, nagare moves the
  # failing messages to a Dead Letter Queue stream. 
  # By default this stream is named 'dlq', but you can customize it.
  config.dlq_stream = 'its_dead_jim'
end

Usage

Concepts

Publishers

Publisher is a mixin you can add into controllers, models and services to produce events to be consumed by other parts of your application or other microservices in your landscape.

Usage
class User < ApplicationModel
  include Nagare::Publisher
  stream 'users'

  after_commit :publish_event

  def publish_event
    publish(user_updated: self.id)
  end
end

Listeners

Listener is a new first class citizen in the Rails world like models and controllers. They receive events from Redis Stream Groups and process them.

Usage
class UserListener < Nagare::Listener
  stream 'users'

  def user_updated(event)
    user = User.find(event.data)
    Mailchimp.update_user(user)
  end
end

Development

After checking out the repo, run bin/setup to install dependencies. Then, run rake spec to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and tags, and push the .gem file to rubygems.org. A merge to the main branch will automatically open a PR to release a new version of nagare. Merging that PR will release to rubygems.

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/vavato-be/nagare. This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the code of conduct.

License

The gem is available as open source under the terms of the MIT License.

Code of Conduct

Everyone interacting in the Nagare project's codebases, issue trackers, chat rooms and mailing lists is expected to follow the code of conduct.