Class: Omnes::Bus

Inherits:
Object
  • Object
show all
Defined in:
lib/omnes/bus.rb

Overview

An event bus for the publish/subscribe pattern

An instance of this class acts as an event bus middleware for publishers of events and their subscriptions.

bus = Omnes::Bus.new

Before being able to work with a given event, its name (a Symbol) needs to be registered:

bus.register(:foo)

An event can be anything responding to a method :omes_event_name which, needless to say, must match with a registered name.

Typically, there're two main ways to generate events.

An event can be generated at publication time, where you provide its name and a payload to be consumed by its subscribers.

bus.publish(:foo, bar: :baz)

In that case, an instance of UnstructuredEvent is generated under the hood.

Unstructured events are straightforward to create and use, but they're harder to debug as they're defined at publication time. On top of that, other features, such as event persistence, can't be reliably built on top of them.

You can also publish an instance of a class including Event. The only fancy thing it provides is an OOTB event name generated based on the class name. See Event for details.

class Foo
  include Omnes::Event

  attr_reader :bar

  def initialize
    @bar = :baz
  end
end

bus.publish(Foo.new)

Instance-backed events provide a well-defined structure, and other features, like event persistence, can be added on top of them.

Regardless of the type of published event, it's yielded to its subscriptions so that they can do their job:

bus.subscribe(:foo) do |event|
  # event.payload[:bar] or event[:bar] for unstructured events
  # event.bar for the event instance example
end

The subscription code can be given as a block (previous example) or as anything responding to a method #call.

class MySubscription
  def call(event)
    # ...
  end
end

bus.subscribe(:foo, MySubscription.new)

See also Subscriber for a more powerful way to define standalone event handlers.

You can also create a subscription that will run for all events:

bus.subscribe_to_all(MySubscription.new)

Custom matchers can be defined. A matcher is something responding to #call and taking the event as an argument. It needs to return true or false to decide whether the subscription needs to be run for that event.

matcher ->(event) { event.name.start_with?(:foo) }

bus.subscribe_with_matcher(matcher, MySubscription.new)

For all previous subscription methods, a subscription object is returned. You can supply a subscription id to it to be able to fetch it from the bus later on:

subscription = bus.subscribe(:foo, MySubscription.new, id: :foo_sub)
bus.subscription(:foo_sub) == subscription #=> true

A subscription can be referenced when you want to unsubscribe:

bus.unsubscribe(subscription)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cal_loc_start: 1, registry: Registry.new, subscriptions: []) ⇒ Bus

Returns a new instance of Bus.



142
143
144
145
146
# File 'lib/omnes/bus.rb', line 142

def initialize(cal_loc_start: 1, registry: Registry.new, subscriptions: [])
  @cal_loc_start = cal_loc_start
  @registry = registry
  @subscriptions = subscriptions
end

Instance Attribute Details

#cal_loc_startObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



135
136
137
# File 'lib/omnes/bus.rb', line 135

def cal_loc_start
  @cal_loc_start
end

#registryObject (readonly)

Returns the value of attribute registry.



140
141
142
# File 'lib/omnes/bus.rb', line 140

def registry
  @registry
end

#subscriptionsObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



135
136
137
# File 'lib/omnes/bus.rb', line 135

def subscriptions
  @subscriptions
end

Class Method Details

.EventType(value, **payload) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



125
126
127
128
129
130
131
132
# File 'lib/omnes/bus.rb', line 125

def self.EventType(value, **payload)
  case value
  when Symbol
    UnstructuredEvent.new(omnes_event_name: value, payload: payload)
  else
    value
  end
end

Instance Method Details

#clearOmnes::Bus

Clears all registered events and subscriptions

Useful for code reloading.

Returns:



295
296
297
298
299
300
# File 'lib/omnes/bus.rb', line 295

def clear
  tap do
    @subscriptions = []
    @registry = Registry.new
  end
end

#performing_nothing(&block) ⇒ Object

Specialized version of #performing_only with no subscriptions

See Also:



277
278
279
# File 'lib/omnes/bus.rb', line 277

def performing_nothing(&block)
  performing_only(&block)
end

#performing_only(*selection) { ... } ⇒ Object

Runs given block performing only a selection of subscriptions

That's something useful for testing purposes, as it allows to silence subscriptions that are not part of the system under test.

After the block is over, original subscriptions are restored.

Parameters:

Yields:

  • Block to run

Raises:



260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/omnes/bus.rb', line 260

def performing_only(*selection)
  selection.each do |subscription|
    unless subscriptions.include?(subscription)
      raise UnknownSubscriptionError.new(subscription: subscription,
                                         bus: self)
    end
  end
  all_subscriptions = subscriptions
  @subscriptions = selection
  yield
ensure
  @subscriptions = all_subscriptions
end

#publish(event_name, caller_location: , **payload) ⇒ Omnes::Publication #publish(event, caller_location: ) ⇒ Omnes::Publication

Publishes an event, running all matching subscriptions

Overloads:

  • #publish(event_name, caller_location: , **payload) ⇒ Omnes::Publication

    Parameters:

    • event_name (Symbol)

      Name for the generated UnstructuredEvent event.

    • **payload (Hash)

      Payload for the generated UnstrUnstructuredEvent

  • #publish(event, caller_location: ) ⇒ Omnes::Publication

    Parameters:

    • event (#name)

      An event instance

Parameters:

  • caller_location (Thread::Backtrace::Location) (defaults to: )

    Caller location associated to the publication. Useful for debugging (shown in error messages). It defaults to this method's caller.

Returns:

  • (Omnes::Publication)

    A publication object encapsulating metadata for the event and the originated subscription executions

Raises:



183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/omnes/bus.rb', line 183

def publish(event, caller_location: caller_locations(cal_loc_start)[0], **payload)
  publication_time = Time.now.utc
  event = self.class.EventType(event, **payload)
  registry.check_event_name(event.omnes_event_name)
  publication_context = PublicationContext.new(caller_location: caller_location, time: publication_time)
  executions = execute_subscriptions_for_event(event, publication_context)

  Publication.new(
    event: event,
    executions: executions,
    context: publication_context
  )
end

#register(event_name, caller_location: ) ⇒ Omnes::Registry::Registration

Registers an event name

Parameters:

  • event_name (Symbol)
  • caller_location (Thread::Backtrace::Location) (defaults to: )

    Caller location associated to the registration. Useful for debugging (shown in error messages). It defaults to this method's caller.

Returns:

Raises:



160
161
162
# File 'lib/omnes/bus.rb', line 160

def register(event_name, caller_location: caller_locations(cal_loc_start)[0])
  registry.register(event_name, caller_location: caller_location)
end

#subscribe(event_name, callable = nil, id: Subscription.random_id) {|event| ... } ⇒ Omnes::Subscription

Adds a subscription for a single event

Parameters:

  • event_name (Symbol)

    Name of the event

  • callable (#call) (defaults to: nil)

    Subscription callback taking the event

  • id (Symbol) (defaults to: Subscription.random_id)

    Unique identifier for the subscription

Yields:

  • (event)

    Subscription callback if callable is not given

Returns:

Raises:



207
208
209
210
211
# File 'lib/omnes/bus.rb', line 207

def subscribe(event_name, callable = nil, id: Subscription.random_id, &block)
  registry.check_event_name(event_name)

  subscribe_with_matcher(Subscription::SINGLE_EVENT_MATCHER.curry[event_name], callable, id: id, &block)
end

#subscribe_to_all(callable = nil, id: Subscription.random_id) {|event| ... } ⇒ Omnes::Subscription

Adds a subscription for all events

Parameters:

  • callable (#call) (defaults to: nil)

    Subscription callback taking the event

  • id (Symbol) (defaults to: Subscription.random_id)

    Unique identifier for the subscription

Yields:

  • (event)

    Subscription callback if callable is not given

Returns:



220
221
222
# File 'lib/omnes/bus.rb', line 220

def subscribe_to_all(callable = nil, id: Subscription.random_id, &block)
  subscribe_with_matcher(Subscription::ALL_EVENTS_MATCHER, callable, id: id, &block)
end

#subscribe_with_matcher(matcher, callable = nil, id: Subscription.random_id) {|event| ... } ⇒ Omnes::Subscription

Adds a subscription with given matcher

Parameters:

  • matcher (#call)

    Callable taking the event and returning a boolean

  • callable (#call) (defaults to: nil)

    Subscription callback taking the event

  • id (Symbol) (defaults to: Subscription.random_id)

    Unique identifier for the subscription

Yields:

  • (event)

    Subscription callback if callable is not given

Returns:

Raises:



232
233
234
235
236
237
238
239
# File 'lib/omnes/bus.rb', line 232

def subscribe_with_matcher(matcher, callable = nil, id: Subscription.random_id, &block)
  raise DuplicateSubscriptionIdError.new(id: id, bus: self) if subscription(id)

  callback = callable || block
  Subscription.new(matcher: matcher, callback: callback, id: id).tap do |subscription|
    @subscriptions << subscription
  end
end

#subscription(id) ⇒ Omnes::Subscription

Fetch a subscription by its identifier

Parameters:

  • id (Symbol)

    Subscription identifier

Returns:



286
287
288
# File 'lib/omnes/bus.rb', line 286

def subscription(id)
  subscriptions.find { |subscription| subscription.id == id }
end

#unsubscribe(subscription) ⇒ Object

Removes a subscription

Parameters:



244
245
246
# File 'lib/omnes/bus.rb', line 244

def unsubscribe(subscription)
  @subscriptions.delete(subscription)
end