Class: LocalBus::Bus
- Inherits:
-
Object
- Object
- LocalBus::Bus
- Includes:
- MonitorMixin
- Defined in:
- lib/local_bus/bus.rb
Overview
While the Bus uses asynchronous operations to optimize performance, the actual processing of a message may still experience slight delays due to I/O wait times from prior messages. This means that while the Bus aims for immediate processing, the nature of asynchronous operations can introduce some latency.
The Bus acts as a direct transport mechanism for messages, akin to placing a passenger directly onto a bus. When a message is published to the Bus, it is immediately delivered to all subscribers, ensuring prompt execution of tasks. This is achieved through non-blocking I/O operations, which allow the Bus to handle multiple tasks efficiently without blocking the main thread.
Instance Method Summary collapse
-
#concurrency ⇒ Object
Maximum number of concurrent tasks that can run in “parallel”.
-
#concurrency=(value) ⇒ Object
Sets the max concurrency.
-
#initialize(concurrency: Etc.nprocessors) ⇒ Bus
constructor
Constructor.
-
#publish(topic, timeout: 60, **payload) ⇒ Object
Publishes a message.
-
#publish_message(message) ⇒ Object
Publishes a pre-built message.
-
#subscribe(topic, callable: nil, &block) ⇒ Object
Subscribes a callable to a topic.
-
#subscriptions ⇒ Object
Registered subscriptions.
-
#topics ⇒ Object
Registered topics that have subscribers.
-
#unsubscribe(topic, callable:) ⇒ Object
Unsubscribes a callable from a topic.
-
#unsubscribe_all(topic) ⇒ Object
Unsubscribes all subscribers from a topic and removes the topic.
-
#with_topic(topic, &block) ⇒ Object
Executes a block and unsubscribes all subscribers from the topic afterwards.
Constructor Details
#initialize(concurrency: Etc.nprocessors) ⇒ Bus
Creates a new Bus instance with specified max concurrency (i.e. number of tasks that can run in parallel)
Constructor
17 18 19 20 21 22 23 |
# File 'lib/local_bus/bus.rb', line 17 def initialize(concurrency: Etc.nprocessors) super() @concurrency = concurrency.to_i @subscriptions = Hash.new do |hash, key| hash[key] = Set.new end end |
Instance Method Details
#concurrency ⇒ Object
Maximum number of concurrent tasks that can run in “parallel”
27 28 29 |
# File 'lib/local_bus/bus.rb', line 27 def concurrency synchronize { @concurrency } end |
#concurrency=(value) ⇒ Object
Sets the max concurrency
34 35 36 |
# File 'lib/local_bus/bus.rb', line 34 def concurrency=(value) synchronize { @concurrency = value.to_i } end |
#publish(topic, timeout: 60, **payload) ⇒ Object
If subscribers are rapidly created/destroyed mid-publish, there’s a theoretical possibility of object_id reuse. However, this is extremely unlikely in practice.
-
If subscribers are added mid-publish, they will not receive the message
-
If subscribers are removed mid-publish, they will still receive the message
If the timeout is exceeded, the task will be cancelled before all subscribers have completed.
Publishes a message
Check individual Subscribers for possible errors.
117 118 119 |
# File 'lib/local_bus/bus.rb', line 117 def publish(topic, timeout: 60, **payload) Message.new(topic, timeout: timeout.to_f, **payload) end |
#publish_message(message) ⇒ Object
Publishes a pre-built message
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/local_bus/bus.rb', line 124 def () = Async::Barrier.new subscribers = subscriptions.fetch(.topic, []).map { Subscriber.new _1, } if subscribers.any? Sync do |task| task.with_timeout .timeout do semaphore = Async::Semaphore.new(concurrency, parent: ) subscribers.each do |subscriber| semaphore.async do subscriber.perform end end rescue Async::TimeoutError => cause .stop subscribers.select(&:pending?).each do |subscriber| subscriber.timeout cause end end end end .publication = Publication.new(, *subscribers) end |
#subscribe(topic, callable: nil, &block) ⇒ Object
Subscribes a callable to a topic
60 61 62 63 64 65 |
# File 'lib/local_bus/bus.rb', line 60 def subscribe(topic, callable: nil, &block) callable ||= block raise ArgumentError, "Subscriber must respond to #call" unless callable.respond_to?(:call, false) synchronize { @subscriptions[topic.to_s].add callable } self end |
#subscriptions ⇒ Object
Registered subscriptions
46 47 48 49 50 51 52 |
# File 'lib/local_bus/bus.rb', line 46 def subscriptions synchronize do @subscriptions.each_with_object({}) do |(topic, callables), memo| memo[topic] = callables.to_a end end end |
#topics ⇒ Object
Registered topics that have subscribers
40 41 42 |
# File 'lib/local_bus/bus.rb', line 40 def topics synchronize { @subscriptions.keys } end |
#unsubscribe(topic, callable:) ⇒ Object
Unsubscribes a callable from a topic
71 72 73 74 75 76 77 78 |
# File 'lib/local_bus/bus.rb', line 71 def unsubscribe(topic, callable:) topic = topic.to_s synchronize do @subscriptions[topic].delete callable @subscriptions.delete(topic) if @subscriptions[topic].empty? end self end |
#unsubscribe_all(topic) ⇒ Object
Unsubscribes all subscribers from a topic and removes the topic
83 84 85 86 87 88 89 90 |
# File 'lib/local_bus/bus.rb', line 83 def unsubscribe_all(topic) topic = topic.to_s synchronize do @subscriptions[topic].clear @subscriptions.delete topic end self end |
#with_topic(topic, &block) ⇒ Object
Executes a block and unsubscribes all subscribers from the topic afterwards
95 96 97 98 99 |
# File 'lib/local_bus/bus.rb', line 95 def with_topic(topic, &block) block.call topic.to_s ensure unsubscribe_all topic end |