Class: LocalBus::Bus

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/local_bus/bus.rb

Overview

Note:

While the Bus uses asynchronous operations to optimize performance, the actual processing of a message may still experience slight delays due to I/O wait times from prior messages. This means that while the Bus aims for immediate processing, the nature of asynchronous operations can introduce some latency.

The Bus acts as a direct transport mechanism for messages, akin to placing a passenger directly onto a bus. When a message is published to the Bus, it is immediately delivered to all subscribers, ensuring prompt execution of tasks. This is achieved through non-blocking I/O operations, which allow the Bus to handle multiple tasks efficiently without blocking the main thread.

Instance Method Summary collapse

Constructor Details

#initialize(concurrency: Etc.nprocessors) ⇒ Bus

Note:

Creates a new Bus instance with specified max concurrency (i.e. number of tasks that can run in parallel)

Constructor



17
18
19
20
21
22
23
# File 'lib/local_bus/bus.rb', line 17

def initialize(concurrency: Etc.nprocessors)
  super()
  @concurrency = concurrency.to_i
  @subscriptions = Hash.new do |hash, key|
    hash[key] = Set.new
  end
end

Instance Method Details

#concurrencyObject

Maximum number of concurrent tasks that can run in “parallel”



27
28
29
# File 'lib/local_bus/bus.rb', line 27

def concurrency
  synchronize { @concurrency }
end

#concurrency=(value) ⇒ Object

Sets the max concurrency



34
35
36
# File 'lib/local_bus/bus.rb', line 34

def concurrency=(value)
  synchronize { @concurrency = value.to_i }
end

#publish(topic, timeout: 60, **payload) ⇒ Object

Note:

If subscribers are rapidly created/destroyed mid-publish, there’s a theoretical possibility of object_id reuse. However, this is extremely unlikely in practice.

  • If subscribers are added mid-publish, they will not receive the message

  • If subscribers are removed mid-publish, they will still receive the message

Note:

If the timeout is exceeded, the task will be cancelled before all subscribers have completed.

Publishes a message

Check individual Subscribers for possible errors.



117
118
119
# File 'lib/local_bus/bus.rb', line 117

def publish(topic, timeout: 60, **payload)
  publish_message Message.new(topic, timeout: timeout.to_f, **payload)
end

#publish_message(message) ⇒ Object

Publishes a pre-built message



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/local_bus/bus.rb', line 124

def publish_message(message)
  barrier = Async::Barrier.new
  subscribers = subscriptions.fetch(message.topic, []).map { Subscriber.new _1, message }

  if subscribers.any?
    Sync do |task|
      task.with_timeout message.timeout do
        semaphore = Async::Semaphore.new(concurrency, parent: barrier)

        subscribers.each do |subscriber|
          semaphore.async do
            subscriber.perform
          end
        end
      rescue Async::TimeoutError => cause
        barrier.stop

        subscribers.select(&:pending?).each do |subscriber|
          subscriber.timeout cause
        end
      end
    end
  end

  message.publication = Publication.new(barrier, *subscribers)
  message
end

#subscribe(topic, callable: nil, &block) ⇒ Object

Subscribes a callable to a topic

Raises:

  • (ArgumentError)

    if neither callable nor block is provided



60
61
62
63
64
65
# File 'lib/local_bus/bus.rb', line 60

def subscribe(topic, callable: nil, &block)
  callable ||= block
  raise ArgumentError, "Subscriber must respond to #call" unless callable.respond_to?(:call, false)
  synchronize { @subscriptions[topic.to_s].add callable }
  self
end

#subscriptionsObject

Registered subscriptions



46
47
48
49
50
51
52
# File 'lib/local_bus/bus.rb', line 46

def subscriptions
  synchronize do
    @subscriptions.each_with_object({}) do |(topic, callables), memo|
      memo[topic] = callables.to_a
    end
  end
end

#topicsObject

Registered topics that have subscribers



40
41
42
# File 'lib/local_bus/bus.rb', line 40

def topics
  synchronize { @subscriptions.keys }
end

#unsubscribe(topic, callable:) ⇒ Object

Unsubscribes a callable from a topic



71
72
73
74
75
76
77
78
# File 'lib/local_bus/bus.rb', line 71

def unsubscribe(topic, callable:)
  topic = topic.to_s
  synchronize do
    @subscriptions[topic].delete callable
    @subscriptions.delete(topic) if @subscriptions[topic].empty?
  end
  self
end

#unsubscribe_all(topic) ⇒ Object

Unsubscribes all subscribers from a topic and removes the topic



83
84
85
86
87
88
89
90
# File 'lib/local_bus/bus.rb', line 83

def unsubscribe_all(topic)
  topic = topic.to_s
  synchronize do
    @subscriptions[topic].clear
    @subscriptions.delete topic
  end
  self
end

#with_topic(topic, &block) ⇒ Object

Executes a block and unsubscribes all subscribers from the topic afterwards



95
96
97
98
99
# File 'lib/local_bus/bus.rb', line 95

def with_topic(topic, &block)
  block.call topic.to_s
ensure
  unsubscribe_all topic
end