Class: FiniteMachine::MessageQueue Private
- Inherits:
-
Object
- Object
- FiniteMachine::MessageQueue
- Defined in:
- lib/finite_machine/message_queue.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Responsible for storage of asynchronous messages such as events and callbacks.
Used internally by Observer
Instance Method Summary collapse
-
#<<(event) ⇒ void
Add an asynchronous event to the message queue to process.
-
#alive? ⇒ Boolean
Check whether or not the message queue is alive.
-
#empty? ⇒ Boolean
Check whether or not there are any messages to handle.
-
#initialize ⇒ MessageQueue
constructor
Initialize a MessageQueue.
-
#inspect ⇒ String
Inspect this message queue.
-
#join(timeout = nil) ⇒ Thread?
Join the message queue from the current thread.
-
#running? ⇒ Boolean
Check whether or not the message queue is running.
-
#shutdown ⇒ Boolean
Shut down this message queue and clean it up.
-
#size ⇒ Integer
The number of messages waiting for processing.
-
#spawn_thread ⇒ Thread
private
Spawn a new background thread.
-
#start ⇒ Thread?
private
Start a new thread with a queue of callback events to run.
-
#subscribe(*args, &block) ⇒ void
Add a listener for the message queue to receive notifications.
Constructor Details
#initialize ⇒ MessageQueue
Initialize a MessageQueue
20 21 22 23 24 25 26 27 |
# File 'lib/finite_machine/message_queue.rb', line 20 def initialize @not_empty = ConditionVariable.new @mutex = Mutex.new @queue = Queue.new @dead = false @listeners = [] @thread = nil end |
Instance Method Details
#<<(event) ⇒ void
This method returns an undefined value.
Add an asynchronous event to the message queue to process
78 79 80 81 82 83 84 85 86 87 |
# File 'lib/finite_machine/message_queue.rb', line 78 def <<(event) @mutex.synchronize do if @dead (event) else @queue << event @not_empty.signal end end end |
#alive? ⇒ Boolean
Check whether or not the message queue is alive
123 124 125 |
# File 'lib/finite_machine/message_queue.rb', line 123 def alive? @mutex.synchronize { !@dead } end |
#empty? ⇒ Boolean
Check whether or not there are any messages to handle
111 112 113 |
# File 'lib/finite_machine/message_queue.rb', line 111 def empty? @mutex.synchronize { @queue.empty? } end |
#inspect ⇒ String
Inspect this message queue
191 192 193 194 195 |
# File 'lib/finite_machine/message_queue.rb', line 191 def inspect @mutex.synchronize do "#<#{self.class}:#{object_id.to_s(16)} @size=#{size}, @dead=#{@dead}>" end end |
#join(timeout = nil) ⇒ Thread?
Join the message queue from the current thread
138 139 140 141 142 |
# File 'lib/finite_machine/message_queue.rb', line 138 def join(timeout = nil) return unless @thread timeout.nil? ? @thread.join : @thread.join(timeout) end |
#running? ⇒ Boolean
Check whether or not the message queue is running
63 64 65 |
# File 'lib/finite_machine/message_queue.rb', line 63 def running? !@thread.nil? && alive? end |
#shutdown ⇒ Boolean
Shut down this message queue and clean it up
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/finite_machine/message_queue.rb', line 154 def shutdown raise MessageQueueDeadError, "message queue already dead" if @dead queue = [] @mutex.synchronize do @dead = true @not_empty.broadcast queue = @queue @queue.clear end while !queue.empty? (queue.pop) end true end |
#size ⇒ Integer
The number of messages waiting for processing
179 180 181 |
# File 'lib/finite_machine/message_queue.rb', line 179 def size @mutex.synchronize { @queue.size } end |
#spawn_thread ⇒ Thread
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Spawn a new background thread
48 49 50 51 52 53 |
# File 'lib/finite_machine/message_queue.rb', line 48 def spawn_thread @thread = Thread.new do Thread.current.abort_on_exception = true process_events end end |
#start ⇒ Thread?
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Start a new thread with a queue of callback events to run
37 38 39 40 41 |
# File 'lib/finite_machine/message_queue.rb', line 37 def start return if running? @mutex.synchronize { spawn_thread } end |
#subscribe(*args, &block) ⇒ void
This method returns an undefined value.
Add a listener for the message queue to receive notifications
97 98 99 100 101 102 103 |
# File 'lib/finite_machine/message_queue.rb', line 97 def subscribe(*args, &block) @mutex.synchronize do listener = Listener.new(*args) listener.on_delivery(&block) @listeners << listener end end |