Class: LocalBus::Station

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/local_bus/station.rb

Overview

The Station serves as a queuing system for messages, similar to a bus station where passengers wait for their bus.

When a message is published to the Station, it is queued and processed at a later time, allowing for deferred execution. This is particularly useful for tasks that can be handled later.

The Station employs a thread pool to manage message processing, enabling high concurrency and efficient resource utilization. Messages can also be prioritized, ensuring that higher-priority tasks are processed first.

@note: While the Station provides a robust mechanism for background processing,

it's important to understand that the exact timing of message processing is not controlled by the publisher,
and messages will be processed as resources become available.

Defined Under Namespace

Classes: CapacityError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bus: Bus.new, interval: 0.1, limit: 10_000, threads: Etc.nprocessors, timeout: 60, wait: 5) ⇒ Station

Note:

Delays process exit in an attempt to flush the queue to avoid dropping messages. Exit flushing makes a “best effort” to process all messages, but it’s not guaranteed. Will not delay process exit when the queue is empty.

Constructor



33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/local_bus/station.rb', line 33

def initialize(bus: Bus.new, interval: 0.1, limit: 10_000, threads: Etc.nprocessors, timeout: 60, wait: 5)
  super()
  @bus = bus
  @interval = interval.to_f
  @interval = 0.1 unless @interval.positive?
  @limit = limit.to_i.positive? ? limit.to_i : 10_000
  @threads = [threads.to_i, 1].max
  @timeout = timeout.to_f
  @queue = Containers::PriorityQueue.new
  at_exit { stop timeout: [wait.to_f, 1].max }
  start
end

Instance Attribute Details

#busObject (readonly)

Bus instance



48
49
50
# File 'lib/local_bus/station.rb', line 48

def bus
  @bus
end

#intervalObject (readonly)

Queue polling interval in seconds



52
53
54
# File 'lib/local_bus/station.rb', line 52

def interval
  @interval
end

#limitObject (readonly)

Max queue size



56
57
58
# File 'lib/local_bus/station.rb', line 56

def limit
  @limit
end

#threadsObject (readonly)

Number of threads to use



60
61
62
# File 'lib/local_bus/station.rb', line 60

def threads
  @threads
end

#timeoutObject (readonly)

Default timeout for message processing (in seconds)



64
65
66
# File 'lib/local_bus/station.rb', line 64

def timeout
  @timeout
end

Instance Method Details

#countObject

Number of unprocessed messages in the queue



134
135
136
# File 'lib/local_bus/station.rb', line 134

def count
  synchronize { @queue.size }
end

#empty?Boolean

Indicates if the queue is empty

Returns:

  • (Boolean)


128
129
130
# File 'lib/local_bus/station.rb', line 128

def empty?
  synchronize { @queue.empty? }
end

#publish(topic, priority: 1, timeout: self.timeout, **payload) ⇒ Object

Publishes a message



172
173
174
# File 'lib/local_bus/station.rb', line 172

def publish(topic, priority: 1, timeout: self.timeout, **payload)
  publish_message Message.new(topic, timeout: timeout, **payload), priority: priority
end

#publish_message(message, priority: 1) ⇒ Object

Publishes a pre-built message



179
180
181
182
183
184
# File 'lib/local_bus/station.rb', line 179

def publish_message(message, priority: 1)
  synchronize do
    raise CapacityError, "Station is at capacity! (limit: #{limit})" if @queue.size >= limit
    @queue.push message, priority
  end
end

#running?Boolean

Indicates if the station is running

Returns:

  • (Boolean)


122
123
124
# File 'lib/local_bus/station.rb', line 122

def running?
  synchronize { !!@pool }
end

#start(interval: self.interval, threads: self.threads) ⇒ Object

Starts the station



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/local_bus/station.rb', line 70

def start(interval: self.interval, threads: self.threads)
  interval = 0.1 unless interval.positive?
  threads = [threads.to_i, 1].max

  synchronize do
    return if running? || stopping?

    timers = Timers::Group.new
    @pool = []
    threads.times do
      @pool << Thread.new do
        Thread.current.report_on_exception = true
        timers.every interval do
          message = synchronize { @queue.pop unless @queue.empty? || stopping? }
          bus.send :publish_message, message if message
        end

        loop do
          timers.wait
          break if stopping?
        end
      ensure
        timers.cancel
      end
    end
  end
end

#stop(timeout: nil) ⇒ Object

Stops the station



101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/local_bus/station.rb', line 101

def stop(timeout: nil)
  synchronize do
    return unless running?
    return if stopping?
    @stopping = true
  end

  @pool&.each do |thread|
    timeout.is_a?(Numeric) ? thread.join(timeout) : thread.join
  end
ensure
  @stopping = false
  @pool = nil
end

#stopping?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'lib/local_bus/station.rb', line 116

def stopping?
  synchronize { !!@stopping }
end

#subscribeObject

Subscribe to a topic



143
144
145
146
# File 'lib/local_bus/station.rb', line 143

def subscribe(...)
  bus.subscribe(...)
  self
end

#unsubscribeObject

Unsubscribes a callable from a topic



152
153
154
155
# File 'lib/local_bus/station.rb', line 152

def unsubscribe(...)
  bus.unsubscribe(...)
  self
end

#unsubscribe_allObject

Unsubscribes all subscribers from a topic and removes the topic



160
161
162
163
# File 'lib/local_bus/station.rb', line 160

def unsubscribe_all(...)
  bus.unsubscribe_all(...)
  self
end