Class: FiniteMachine::MessageQueue Private

Inherits:
Object
  • Object
show all
Defined in:
lib/finite_machine/message_queue.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Responsible for storage of asynchronous messages such as events and callbacks.

Used internally by Observer

Instance Method Summary collapse

Constructor Details

#initializeMessageQueue

Initialize a MessageQueue

Examples:

message_queue = FiniteMachine::MessageQueue.new


20
21
22
23
24
25
26
27
# File 'lib/finite_machine/message_queue.rb', line 20

def initialize
  @not_empty = ConditionVariable.new
  @mutex     = Mutex.new
  @queue     = Queue.new
  @dead      = false
  @listeners = []
  @thread    = nil
end

Instance Method Details

#<<(event) ⇒ void

This method returns an undefined value.

Add an asynchronous event to the message queue to process

Examples:

message_queue << AsyncCall.build(...)

Parameters:



78
79
80
81
82
83
84
85
86
87
# File 'lib/finite_machine/message_queue.rb', line 78

def <<(event)
  @mutex.synchronize do
    if @dead
      discard_message(event)
    else
      @queue << event
      @not_empty.signal
    end
  end
end

#alive?Boolean

Check whether or not the message queue is alive

Examples:

message_queue.alive?

Returns:

  • (Boolean)


123
124
125
# File 'lib/finite_machine/message_queue.rb', line 123

def alive?
  @mutex.synchronize { !@dead }
end

#empty?Boolean

Check whether or not there are any messages to handle

Examples:

message_queue.empty?

Returns:

  • (Boolean)


111
112
113
# File 'lib/finite_machine/message_queue.rb', line 111

def empty?
  @mutex.synchronize { @queue.empty? }
end

#inspectString

Inspect this message queue

Examples:

message_queue.inspect

Returns:

  • (String)


191
192
193
194
195
# File 'lib/finite_machine/message_queue.rb', line 191

def inspect
  @mutex.synchronize do
    "#<#{self.class}:#{object_id.to_s(16)} @size=#{size}, @dead=#{@dead}>"
  end
end

#join(timeout = nil) ⇒ Thread?

Join the message queue from the current thread

Examples:

message_queue.join

Parameters:

  • timeout (Fixnum) (defaults to: nil)

    the time limit

Returns:

  • (Thread, nil)


138
139
140
141
142
# File 'lib/finite_machine/message_queue.rb', line 138

def join(timeout = nil)
  return unless @thread

  timeout.nil? ? @thread.join : @thread.join(timeout)
end

#running?Boolean

Check whether or not the message queue is running

Examples:

message_queue.running?

Returns:

  • (Boolean)


63
64
65
# File 'lib/finite_machine/message_queue.rb', line 63

def running?
  !@thread.nil? && alive?
end

#shutdownBoolean

Shut down this message queue and clean it up

Examples:

message_queue.shutdown

Returns:

  • (Boolean)

Raises:



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/finite_machine/message_queue.rb', line 154

def shutdown
  raise MessageQueueDeadError, "message queue already dead" if @dead

  queue = []
  @mutex.synchronize do
    @dead = true
    @not_empty.broadcast

    queue = @queue
    @queue.clear
  end
  while !queue.empty?
    discard_message(queue.pop)
  end
  true
end

#sizeInteger

The number of messages waiting for processing

Examples:

message_queue.size

Returns:

  • (Integer)


179
180
181
# File 'lib/finite_machine/message_queue.rb', line 179

def size
  @mutex.synchronize { @queue.size }
end

#spawn_threadThread

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Spawn a new background thread

Returns:

  • (Thread)


48
49
50
51
52
53
# File 'lib/finite_machine/message_queue.rb', line 48

def spawn_thread
  @thread = Thread.new do
    Thread.current.abort_on_exception = true
    process_events
  end
end

#startThread?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Start a new thread with a queue of callback events to run

Examples:

message_queue.start

Returns:

  • (Thread, nil)


37
38
39
40
41
# File 'lib/finite_machine/message_queue.rb', line 37

def start
  return if running?

  @mutex.synchronize { spawn_thread }
end

#subscribe(*args, &block) ⇒ void

This method returns an undefined value.

Add a listener for the message queue to receive notifications

Examples:

message_queue.subscribe { |event| ... }


97
98
99
100
101
102
103
# File 'lib/finite_machine/message_queue.rb', line 97

def subscribe(*args, &block)
  @mutex.synchronize do
    listener = Listener.new(*args)
    listener.on_delivery(&block)
    @listeners << listener
  end
end