EventStoreSubscriptions
Extends the functionality of the EventStoreDB ruby client with a catch-up subscriptions manager.
By default event_store_client
implements thread-blocking methods to subscribe to a stream. Those are #subscribe_to_stream
and #subscribe_to_all
. In order to subscribe to many streams/events, you need to implement asynchronous subscriptions on your own. This gem solves this task by putting each subscription into its own thread.
The thread-based implementation has a downside: any IO operation in your subscription's handlers will block all other threads. So it is important to consider how many subscriptions you put into a single process. There is a plan to integrate Ractors instead/alongside threads to provide the option to eliminate the IO-blocking issue.
Installation
Add this line to your application's Gemfile:
gem 'event_store_subscriptions'
And then execute:
$ bundle install
Or install it yourself as:
$ gem install event_store_subscriptions
Usage
Use the #create
and #create_for_all
methods to subscribe to a stream. For the full list of available arguments see the documentation of the EventStoreClient::GRPC::Client#subscribe_to_stream
method in the event_store_client gem docs. You may also want to check the Catch-up subscriptions section as well.
Subscribing to a specific stream
Use the #create
method in order to subscribe to specific stream:
subscriptions = EventStoreSubscriptions::Subscriptions.new(EventStoreClient.client)
handler = proc do |event|
# process event
end
subscriptions.create('some-stream', handler: handler)
subscriptions.listen_all
You may provide any object which responds to #call
as a handler:
class SomeStreamHandler
def call(event)
# process event
end
end
subscriptions = EventStoreSubscriptions::Subscriptions.new(EventStoreClient.client)
subscriptions.create('some-stream', handler: SomeStreamHandler.new)
subscriptions.listen_all
Subscribing to the $all stream
Use the #create_for_all
method to subscribe to the all stream:
subscriptions = EventStoreSubscriptions::Subscriptions.new(EventStoreClient.client)
handler = proc do |event|
# process event
end
subscriptions.create_for_all(handler: handler)
subscriptions.listen_all
You may also explicitly pass "$all"
stream name to the #create
method:
subscriptions = EventStoreSubscriptions::Subscriptions.new(EventStoreClient.client)
handler = proc do |event|
# process event
end
subscriptions.create('$all', handler: handler)
subscriptions.listen_all
Handling Subscription position updates
You may want to add a handler that will be executed each time a subscription gets position updates. Such updates happen when new events are added to the stream or when EventStore DB produces a checkpoint response.
Listening for position updates of a specific stream
A handler registered to receive position updates of a specific stream is called with the EventStoreSubscriptions::SubscriptionRevision
class instance. It holds the current revision of the stream.
subscriptions = EventStoreSubscriptions::Subscriptions.new(EventStoreClient.client)
subscription = subscriptions.create('some-stream', handler: proc { |event| p event })
subscription.position.register_update_hook do |position|
puts "Current revision is #{position.revision}"
end
subscription.listen
Listening for position updates of the $all stream
A handler registered to receive position updates of the $all
stream is called with the EventStoreSubscriptions::SubscriptionPosition
class instance. It holds the current commit_position
and prepare_position
of the $all
stream.
subscriptions = EventStoreSubscriptions::Subscriptions.new(EventStoreClient.client)
subscription = subscriptions.create_for_all(handler: proc { |event| p event })
subscription.position.register_update_hook do |position|
puts "Current commit/prepare positions are #{position.commit_position}/#{position.prepare_position}"
end
subscription.listen
Automatic restart of failed Subscriptions
This gem provides a possibility to watch over your subscription collections and restart a subscription in case it failed. Subscriptions may fail because an exception was raised in the handler or in the position update hook. A new subscription will be started, listening from the position the failed subscription has stopped.
Start watching over your subscriptions' collection:
subscriptions = EventStoreSubscriptions::Subscriptions.new(EventStoreClient.client)
subscriptions.create_for_all(handler: proc { |event| p event })
EventStoreSubscriptions::WatchDog.watch(subscriptions)
subscriptions.listen_all
Async nature of this gem
EventStoreSubscriptions::Subscriptions#listen_all
, EventStoreSubscriptions::Subscriptions#stop_all
, EventStoreSubscriptions::Subscription#listen
, EventStoreSubscriptions::Subscription#stop_listening
, EventStoreSubscriptions::WatchDog#watch
, EventStoreSubscriptions::WatchDog#unwatch
methods are asynchronous. This means that they spawn thread that performs proper task in the background.
EventStoreSubscriptions::Subscriptions#stop_all
, EventStoreSubscriptions::Subscription#stop_listening
and EventStoreSubscriptions::WatchDog#unwatch
methods has ending run time, meaning that they runners won't run forever.
EventStoreSubscriptions::Subscriptions#listen_all
, EventStoreSubscriptions::Subscription#listen
and EventStoreSubscriptions::WatchDog#watch
methods will run forever.
In order to stop running Subscription
or WatchDog
you should initiate stop process and wait for finish.
Stopping Subscription
For single subscription:
subscriptions = EventStoreSubscriptions::Subscriptions.new(EventStoreClient.client)
subscription = subscriptions.create_for_all(handler: proc { |event| p event })
subscription.listen
# Initiate Subscription shutdown
subscription.stop_listening
# Wait for Subscription to finish. This will block current Thread.
subscription.wait_for_finish
For the entire collection:
subscriptions = EventStoreSubscriptions::Subscriptions.new(EventStoreClient.client)
subscriptions.create_for_all(handler: proc { |event| p event })
subscriptions.listen_all
# Initiate shutdown for each Subscription in the collection
subscriptions.stop_all
# Wait for all Subscriptions to finish. This will block current Thread.
subscriptions.subscriptions.each(&:wait_for_finish)
Stopping WatchDog
subscriptions = EventStoreSubscriptions::Subscriptions.new(EventStoreClient.client)
watcher = EventStoreSubscriptions::WatchDog.watch(subscriptions)
# Initiate WatchDog shutdown
watcher.unwatch
# Wait for WatchDog to finish. This will block current Thread.
watcher.wait_for_finish
Graceful shutdown
You may want to gracefully shut down the process that handles the subscriptions. In order to do so, you should define a Kernel.trap
handler to handle your kill signal:
subscriptions = EventStoreSubscriptions::Subscriptions.new(EventStoreClient.client)
subscriptions.create_for_all(handler: proc { |event| p event })
watcher = EventStoreSubscriptions::WatchDog.watch(subscriptions)
subscriptions.listen_all
Kernel.trap('TERM') do
# Because the implementation uses Mutex - wrap it into Thread to bypass the limitations of
# Kernel#trap
Thread.new do
# Initiate graceful shutdown. Need to shutdown watcher first, and then - subscriptions
watcher.unwatch.wait_for_finish
subscriptions.stop_all.each(&:wait_for_finish)
end.join
exit
end
# Wait while Subscriptions are working
subscriptions.each(&:wait_for_finish)
Now just send the TERM
signal if you want to gracefully shut down your process:
kill -TERM <pid of your process>
Monitoring Subscriptions
After you started listening your Subscriptions, you may want to monitor status of them. There is various built-in statistics which you can get.
subscriptions = EventStoreSubscriptions::Subscriptions.new(EventStoreClient.client)
subscriptions.create_for_all(handler: proc { |event| p event })
watcher = EventStoreSubscriptions::WatchDog.watch(subscriptions)
subscriptions.listen_all
loop do
sleep 1
subscriptions.subscriptions.each do |subscription|
puts "Current state is: #{subscription.state}"
puts "Current position: #{subscription.position.to_h}"
puts "Last error: #{subscription.statistic.last_error.inspect}"
puts "Last restart was at: #{subscription.statistic.last_restart_at || 'Never'}"
puts "Total errors/restarts: #{subscription.statistic.errors_count}"
puts "Events processed: #{subscription.statistic.events_processed}"
puts "Current watcher state is: #{watcher.state}"
end
end
WatchDog and control of restart condition of Subscriptions
You may want to decide yourself whether WhatchDog
should restart a Subscription
. You can do so by providing a proc which, if thruthy result is returned, skips the restart of Subscription
.
subscriptions = EventStoreSubscriptions::Subscriptions.new(EventStoreClient.client)
subscriptions.create_for_all(handler: proc { |event| p event })
# Do not restart Subscription if its id is even
restart_terminator = proc { |sub| sub.__id__ % 2 == 0 }
EventStoreSubscriptions::WatchDog.watch(subscriptions, restart_terminator: restart_terminator)
subscriptions.listen_all
Development
You will have to install Docker first. It is needed to run EventStore DB. You can run EventStore DB with this command:
docker-compose -f docker-compose-cluster.yml up
Now you can enter a dev console by running bin/console
or run tests by running the rspec
command.
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/yousty/event_store_subscriptions. This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the code of conduct.
Publishing new version
- Push commit with updated
version.rb
file to therelease
branch. The new version will be automatically pushed to rubygems. - Create release on GitHub including change log.
License
The gem is available as open source under the terms of the MIT License.
Code of Conduct
Everyone interacting in the EventStoreSubscriptions project's codebases, issue trackers, chat rooms and mailing lists is expected to follow the code of conduct.