Class: LocalBus::Station
- Inherits:
-
Object
- Object
- LocalBus::Station
- Includes:
- MonitorMixin
- Defined in:
- lib/local_bus/station.rb
Overview
The Station serves as a queuing system for messages, similar to a bus station where passengers wait for their bus.
When a message is published to the Station, it is queued and processed at a later time, allowing for deferred execution. This is particularly useful for tasks that can be handled later.
The Station employs a thread pool to manage message processing, enabling high concurrency and efficient resource utilization. Messages can also be prioritized, ensuring that higher-priority tasks are processed first.
@note: While the Station provides a robust mechanism for background processing,
it's important to understand that the exact timing of message processing is not controlled by the publisher,
and messages will be processed as resources become available.
Defined Under Namespace
Classes: CapacityError
Instance Attribute Summary collapse
-
#bus ⇒ Object
readonly
Bus instance.
-
#interval ⇒ Object
readonly
Queue polling interval in seconds.
-
#limit ⇒ Object
readonly
Max queue size.
-
#threads ⇒ Object
readonly
Number of threads to use.
-
#timeout ⇒ Object
readonly
Default timeout for message processing (in seconds).
Instance Method Summary collapse
-
#count ⇒ Object
Number of unprocessed messages in the queue.
-
#empty? ⇒ Boolean
Indicates if the queue is empty.
-
#initialize(bus: Bus.new, interval: 0.1, limit: 10_000, threads: Etc.nprocessors, timeout: 60, wait: 5) ⇒ Station
constructor
Constructor.
-
#publish(topic, priority: 1, timeout: self.timeout, **payload) ⇒ Object
Publishes a message.
-
#publish_message(message, priority: 1) ⇒ Object
Publishes a pre-built message.
-
#running? ⇒ Boolean
Indicates if the station is running.
-
#start(interval: self.interval, threads: self.threads) ⇒ Object
Starts the station.
-
#stop(timeout: nil) ⇒ Object
Stops the station.
- #stopping? ⇒ Boolean
-
#subscribe ⇒ Object
Subscribe to a topic.
-
#unsubscribe ⇒ Object
Unsubscribes a callable from a topic.
-
#unsubscribe_all ⇒ Object
Unsubscribes all subscribers from a topic and removes the topic.
Constructor Details
#initialize(bus: Bus.new, interval: 0.1, limit: 10_000, threads: Etc.nprocessors, timeout: 60, wait: 5) ⇒ Station
Delays process exit in an attempt to flush the queue to avoid dropping messages. Exit flushing makes a “best effort” to process all messages, but it’s not guaranteed. Will not delay process exit when the queue is empty.
Constructor
33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/local_bus/station.rb', line 33 def initialize(bus: Bus.new, interval: 0.1, limit: 10_000, threads: Etc.nprocessors, timeout: 60, wait: 5) super() @bus = bus @interval = interval.to_f @interval = 0.1 unless @interval.positive? @limit = limit.to_i.positive? ? limit.to_i : 10_000 @threads = [threads.to_i, 1].max @timeout = timeout.to_f @queue = Containers::PriorityQueue.new at_exit { stop timeout: [wait.to_f, 1].max } start end |
Instance Attribute Details
#bus ⇒ Object (readonly)
Bus instance
48 49 50 |
# File 'lib/local_bus/station.rb', line 48 def bus @bus end |
#interval ⇒ Object (readonly)
Queue polling interval in seconds
52 53 54 |
# File 'lib/local_bus/station.rb', line 52 def interval @interval end |
#limit ⇒ Object (readonly)
Max queue size
56 57 58 |
# File 'lib/local_bus/station.rb', line 56 def limit @limit end |
#threads ⇒ Object (readonly)
Number of threads to use
60 61 62 |
# File 'lib/local_bus/station.rb', line 60 def threads @threads end |
#timeout ⇒ Object (readonly)
Default timeout for message processing (in seconds)
64 65 66 |
# File 'lib/local_bus/station.rb', line 64 def timeout @timeout end |
Instance Method Details
#count ⇒ Object
Number of unprocessed messages in the queue
134 135 136 |
# File 'lib/local_bus/station.rb', line 134 def count synchronize { @queue.size } end |
#empty? ⇒ Boolean
Indicates if the queue is empty
128 129 130 |
# File 'lib/local_bus/station.rb', line 128 def empty? synchronize { @queue.empty? } end |
#publish(topic, priority: 1, timeout: self.timeout, **payload) ⇒ Object
Publishes a message
172 173 174 |
# File 'lib/local_bus/station.rb', line 172 def publish(topic, priority: 1, timeout: self.timeout, **payload) Message.new(topic, timeout: timeout, **payload), priority: priority end |
#publish_message(message, priority: 1) ⇒ Object
Publishes a pre-built message
179 180 181 182 183 184 |
# File 'lib/local_bus/station.rb', line 179 def (, priority: 1) synchronize do raise CapacityError, "Station is at capacity! (limit: #{limit})" if @queue.size >= limit @queue.push , priority end end |
#running? ⇒ Boolean
Indicates if the station is running
122 123 124 |
# File 'lib/local_bus/station.rb', line 122 def running? synchronize { !!@pool } end |
#start(interval: self.interval, threads: self.threads) ⇒ Object
Starts the station
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/local_bus/station.rb', line 70 def start(interval: self.interval, threads: self.threads) interval = 0.1 unless interval.positive? threads = [threads.to_i, 1].max synchronize do return if running? || stopping? timers = Timers::Group.new @pool = [] threads.times do @pool << Thread.new do Thread.current.report_on_exception = true timers.every interval do = synchronize { @queue.pop unless @queue.empty? || stopping? } bus.send :publish_message, if end loop do timers.wait break if stopping? end ensure timers.cancel end end end end |
#stop(timeout: nil) ⇒ Object
Stops the station
101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/local_bus/station.rb', line 101 def stop(timeout: nil) synchronize do return unless running? return if stopping? @stopping = true end @pool&.each do |thread| timeout.is_a?(Numeric) ? thread.join(timeout) : thread.join end ensure @stopping = false @pool = nil end |
#stopping? ⇒ Boolean
116 117 118 |
# File 'lib/local_bus/station.rb', line 116 def stopping? synchronize { !!@stopping } end |
#subscribe ⇒ Object
Subscribe to a topic
143 144 145 146 |
# File 'lib/local_bus/station.rb', line 143 def subscribe(...) bus.subscribe(...) self end |
#unsubscribe ⇒ Object
Unsubscribes a callable from a topic
152 153 154 155 |
# File 'lib/local_bus/station.rb', line 152 def unsubscribe(...) bus.unsubscribe(...) self end |
#unsubscribe_all ⇒ Object
Unsubscribes all subscribers from a topic and removes the topic
160 161 162 163 |
# File 'lib/local_bus/station.rb', line 160 def unsubscribe_all(...) bus.unsubscribe_all(...) self end |