Class: Omnes::Bus
- Inherits:
-
Object
- Object
- Omnes::Bus
- Defined in:
- lib/omnes/bus.rb
Overview
An event bus for the publish/subscribe pattern
An instance of this class acts as an event bus middleware for publishers of events and their subscriptions.
bus = Omnes::Bus.new
Before being able to work with a given event, its name (a Symbol) needs to be registered:
bus.register(:foo)
An event can be anything responding to a method :omes_event_name
which,
needless to say, must match with a registered name.
Typically, there're two main ways to generate events.
An event can be generated at publication time, where you provide its name and a payload to be consumed by its subscribers.
bus.publish(:foo, bar: :baz)
In that case, an instance of UnstructuredEvent is generated under the hood.
Unstructured events are straightforward to create and use, but they're harder to debug as they're defined at publication time. On top of that, other features, such as event persistence, can't be reliably built on top of them.
You can also publish an instance of a class including Event. The only fancy thing it provides is an OOTB event name generated based on the class name. See Event for details.
class Foo
include Omnes::Event
attr_reader :bar
def initialize
@bar = :baz
end
end
bus.publish(Foo.new)
Instance-backed events provide a well-defined structure, and other features, like event persistence, can be added on top of them.
Regardless of the type of published event, it's yielded to its subscriptions so that they can do their job:
bus.subscribe(:foo) do |event|
# event.payload[:bar] or event[:bar] for unstructured events
# event.bar for the event instance example
end
The subscription code can be given as a block (previous example) or as
anything responding to a method #call
.
class MySubscription
def call(event)
# ...
end
end
bus.subscribe(:foo, MySubscription.new)
See also Subscriber for a more powerful way to define standalone event handlers.
You can also create a subscription that will run for all events:
bus.subscribe_to_all(MySubscription.new)
Custom matchers can be defined. A matcher is something responding to #call
and taking the event as an argument. It needs to return true
or false
to
decide whether the subscription needs to be run for that event.
matcher ->(event) { event.name.start_with?(:foo) }
bus.subscribe_with_matcher(matcher, MySubscription.new)
For all previous subscription methods, a subscription object is returned. You can supply a subscription id to it to be able to fetch it from the bus later on:
subscription = bus.subscribe(:foo, MySubscription.new, id: :foo_sub)
bus.subscription(:foo_sub) == subscription #=> true
A subscription can be referenced when you want to unsubscribe:
bus.unsubscribe(subscription)
Instance Attribute Summary collapse
- #cal_loc_start ⇒ Object readonly private
-
#registry ⇒ Object
readonly
Returns the value of attribute registry.
- #subscriptions ⇒ Object readonly private
Class Method Summary collapse
Instance Method Summary collapse
-
#clear ⇒ Omnes::Bus
Clears all registered events and subscriptions.
-
#initialize(cal_loc_start: 1, registry: Registry.new, subscriptions: []) ⇒ Bus
constructor
A new instance of Bus.
-
#performing_nothing(&block) ⇒ Object
Specialized version of #performing_only with no subscriptions.
-
#performing_only(*selection) { ... } ⇒ Object
Runs given block performing only a selection of subscriptions.
-
#publish(event, caller_location: , **payload) ⇒ Omnes::Publication
Publishes an event, running all matching subscriptions.
-
#register(event_name, caller_location: ) ⇒ Omnes::Registry::Registration
Registers an event name.
-
#subscribe(event_name, callable = nil, id: Subscription.random_id) {|event| ... } ⇒ Omnes::Subscription
Adds a subscription for a single event.
-
#subscribe_to_all(callable = nil, id: Subscription.random_id) {|event| ... } ⇒ Omnes::Subscription
Adds a subscription for all events.
-
#subscribe_with_matcher(matcher, callable = nil, id: Subscription.random_id) {|event| ... } ⇒ Omnes::Subscription
Adds a subscription with given matcher.
-
#subscription(id) ⇒ Omnes::Subscription
Fetch a subscription by its identifier.
-
#unsubscribe(subscription) ⇒ Object
Removes a subscription.
Constructor Details
#initialize(cal_loc_start: 1, registry: Registry.new, subscriptions: []) ⇒ Bus
Returns a new instance of Bus.
142 143 144 145 146 |
# File 'lib/omnes/bus.rb', line 142 def initialize(cal_loc_start: 1, registry: Registry.new, subscriptions: []) @cal_loc_start = cal_loc_start @registry = registry @subscriptions = subscriptions end |
Instance Attribute Details
#cal_loc_start ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
135 136 137 |
# File 'lib/omnes/bus.rb', line 135 def cal_loc_start @cal_loc_start end |
#registry ⇒ Object (readonly)
Returns the value of attribute registry.
140 141 142 |
# File 'lib/omnes/bus.rb', line 140 def registry @registry end |
#subscriptions ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
135 136 137 |
# File 'lib/omnes/bus.rb', line 135 def subscriptions @subscriptions end |
Class Method Details
.EventType(value, **payload) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
125 126 127 128 129 130 131 132 |
# File 'lib/omnes/bus.rb', line 125 def self.EventType(value, **payload) case value when Symbol UnstructuredEvent.new(omnes_event_name: value, payload: payload) else value end end |
Instance Method Details
#clear ⇒ Omnes::Bus
Clears all registered events and subscriptions
Useful for code reloading.
295 296 297 298 299 300 |
# File 'lib/omnes/bus.rb', line 295 def clear tap do @subscriptions = [] @registry = Registry.new end end |
#performing_nothing(&block) ⇒ Object
Specialized version of #performing_only with no subscriptions
277 278 279 |
# File 'lib/omnes/bus.rb', line 277 def performing_nothing(&block) performing_only(&block) end |
#performing_only(*selection) { ... } ⇒ Object
Runs given block performing only a selection of subscriptions
That's something useful for testing purposes, as it allows to silence subscriptions that are not part of the system under test.
After the block is over, original subscriptions are restored.
260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/omnes/bus.rb', line 260 def performing_only(*selection) selection.each do |subscription| unless subscriptions.include?(subscription) raise UnknownSubscriptionError.new(subscription: subscription, bus: self) end end all_subscriptions = subscriptions @subscriptions = selection yield ensure @subscriptions = all_subscriptions end |
#publish(event_name, caller_location: , **payload) ⇒ Omnes::Publication #publish(event, caller_location: ) ⇒ Omnes::Publication
Publishes an event, running all matching subscriptions
183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/omnes/bus.rb', line 183 def publish(event, caller_location: caller_locations(cal_loc_start)[0], **payload) publication_time = Time.now.utc event = self.class.EventType(event, **payload) registry.check_event_name(event.omnes_event_name) publication_context = PublicationContext.new(caller_location: caller_location, time: publication_time) executions = execute_subscriptions_for_event(event, publication_context) Publication.new( event: event, executions: executions, context: publication_context ) end |
#register(event_name, caller_location: ) ⇒ Omnes::Registry::Registration
Registers an event name
160 161 162 |
# File 'lib/omnes/bus.rb', line 160 def register(event_name, caller_location: caller_locations(cal_loc_start)[0]) registry.register(event_name, caller_location: caller_location) end |
#subscribe(event_name, callable = nil, id: Subscription.random_id) {|event| ... } ⇒ Omnes::Subscription
Adds a subscription for a single event
207 208 209 210 211 |
# File 'lib/omnes/bus.rb', line 207 def subscribe(event_name, callable = nil, id: Subscription.random_id, &block) registry.check_event_name(event_name) subscribe_with_matcher(Subscription::SINGLE_EVENT_MATCHER.curry[event_name], callable, id: id, &block) end |
#subscribe_to_all(callable = nil, id: Subscription.random_id) {|event| ... } ⇒ Omnes::Subscription
Adds a subscription for all events
220 221 222 |
# File 'lib/omnes/bus.rb', line 220 def subscribe_to_all(callable = nil, id: Subscription.random_id, &block) subscribe_with_matcher(Subscription::ALL_EVENTS_MATCHER, callable, id: id, &block) end |
#subscribe_with_matcher(matcher, callable = nil, id: Subscription.random_id) {|event| ... } ⇒ Omnes::Subscription
Adds a subscription with given matcher
232 233 234 235 236 237 238 239 |
# File 'lib/omnes/bus.rb', line 232 def subscribe_with_matcher(matcher, callable = nil, id: Subscription.random_id, &block) raise DuplicateSubscriptionIdError.new(id: id, bus: self) if subscription(id) callback = callable || block Subscription.new(matcher: matcher, callback: callback, id: id).tap do |subscription| @subscriptions << subscription end end |
#subscription(id) ⇒ Omnes::Subscription
Fetch a subscription by its identifier
286 287 288 |
# File 'lib/omnes/bus.rb', line 286 def subscription(id) subscriptions.find { |subscription| subscription.id == id } end |
#unsubscribe(subscription) ⇒ Object
Removes a subscription
244 245 246 |
# File 'lib/omnes/bus.rb', line 244 def unsubscribe(subscription) @subscriptions.delete(subscription) end |