Hivent
An event stream implementation that aggregates facts about your application.
Configuration
Redis Backend
Hivent.configure do |config|
config.backend = :redis
config.endpoint = "redis://localhost:6379/0"
config.partition_count = 4
config.life_cycle_event_handler = MyHandler.new
config.client_id = "my_app_name"
end
Usage
Receive
Receiving works on an instance of a Signal
. For each event received, the given block will be executed once.
You may either specify a version to receive or decide to receive all events for that signal regardless of their version.
signal = Hivent::Signal.new("model_name:created")
# Handle all events for this signal
signal.receive do |event|
# Do something with the event
# event['payload'] contains the payload
# event['meta'] contains information about the event
end
# Handle version 2 events for this signal
signal.receive(version: 2) do |event|
# Do something with the event
# event['payload'] contains the payload
# event['meta'] contains information about the event
end
Wildcard signals
You can receive all events as well by using the *
wildcard. Partial wildcards (such as my_event:*
) are not supported at this time.
signal = Hivent::Signal.new("*")
# Handle all events
signal.receive do |event|
# Do something with the event
# event['payload'] contains the payload
# event['meta'] contains information about the event
end
Worker process
To receive events, a consumer process needs to be started using the provided CLI.
Start the consumer:
bundle exec hivent start -r app/events.rb
For more details on the available options see:
bundle exec hivent --help
bundle exec hivent start --help
Daemonization
The library does not offer any options to daemonize or parallelize your consumers. You are encouraged to use other tools such as Foreman and Upstart to achieve this.
With these two tools, you can set up a Procfile
for the consumer:
consumer: bundle exec hivent start -r app/events.rb
And then use Foreman's export
feature to convert it to an upstart job:
foreman export upstart -a myapp -m consumer=4 -u myuser /etc/init
service myapp start
This will start 4 consumer processes running under the myuser
user. The processes will be daemonized and monitored by upstart.
If your consumers need environment variables, Foreman can pick them up from a .env
file placed next to your Procfile
:
APP_ENV=production
REDIS_URL=redis://something:6379
Callbacks for Life Cycle Events
To add error reporting or logging of consumed events you can configure an handler that is invoked by the consumer when certain lifecycle events occur.
To implement this handler create a class that inherits from Hivent::LifeCycleEventHandler
and overwrite one or more of it's methods.
class MyHandler < Hivent::LifeCycleEventHandler
def application_registered(client_id, events, partition_count)
# log info to logging service
end
def event_processing_succeeded(event_name, event_version, payload)
# log event processing
end
def event_processing_failed(exception, payload, raw_payload, dead_letter_queue_name)
# report to some exception notification service
end
end
The handler needs to be configured in the gem's configuration block. The default handler ignores all life cycle events.
Emit
You can use any name to identify your signals.
All signals are versioned. The version has to be specified as the second parameter of emit
and will be part of the events meta data.
Hivent::Signal.new("model_name:created").emit({ key: "value" }, version: 1)
# => Signal name is added as meta attribute "name"
Meta Data
Each emitted event will automatically be enriched with meta data containing the correlation ID (cid
), the producer
of the event (the client_id
provided in the configuration block) and the created_at
timestamp.
The event name and version will be added to the events meta data.
Correlation ID
To pass in a correlation ID (e.g. from a previously consumed message) use:
cid = event['meta']['cid']
Hivent::Signal.new("model_name:created").emit({ key: "value" }, version: 1, cid: cid)
Keyed Messages
Sometimes it's required to pass a key alongside the message that is used to assign the message to a specific partition (which ensures order of events within this partition).
signal = Hivent::Signal.new("model_name:created")
signal.emit({ key: "value" }, key: "my_custom_key")
Run the Tests
The test suite requires a running Redis server (default: redis://localhost:6379/15
). To point to a different Redis pass in an environment variable when starting the tests.
REDIS_URL=redis://path_to_redis:port/database bundle exec rspec
Test Helpers
To help you write awesome tests, an RSpec helper is provided. To use it, require 'hivent/rspec' before running your test suite:
# in spec_helper.rb
require 'hivent/rspec'
Matchers
Emit
Test whether a signal has been emitted. Optionally, you can define a version.
expect { a_method }.to emit('a:signal')
expect { another_method }.not_to emit('another:signal')
expect { another_method }.to emit('a:signal', version: 2)
You may also assert whether a signal was emitted with a given payload. This matcher asserts that the signal's payload contains the given hash.
expect { subject }.to emit(:event_name).with({ foo: 'bar' })