Class: FiniteMachine::MessageQueue Private

Inherits:
Object
  • Object
show all
Defined in:
lib/finite_machine/message_queue.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Allows for storage of asynchronous messages such as events and callbacks.

Used internally by Observer and StateMachine

Instance Method Summary collapse

Constructor Details

#initializeMessageQueue

Initialize an event queue in separate thread

Examples:

MessageQueue.new


20
21
22
23
24
25
26
27
# File 'lib/finite_machine/message_queue.rb', line 20

def initialize
  @not_empty = ConditionVariable.new
  @mutex     = Mutex.new
  @queue     = Queue.new
  @dead      = false
  @listeners = []
  @thread    = nil
end

Instance Method Details

#<<(event) ⇒ nil

Add asynchronous event to the event queue to process

Examples:

event_queue << AsyncCall.build(...)

Parameters:

Returns:

  • (nil)


61
62
63
64
65
66
67
68
69
70
# File 'lib/finite_machine/message_queue.rb', line 61

def <<(event)
  @mutex.synchronize do
    if @dead
      discard_message(event)
    else
      @queue << event
      @not_empty.signal
    end
  end
end

#alive?Boolean

Check if the event queue is alive

Examples:

event_queue.alive?

Returns:

  • (Boolean)


101
102
103
# File 'lib/finite_machine/message_queue.rb', line 101

def alive?
  @mutex.synchronize { !@dead }
end

#empty?Boolean

Check if there are any events to handle

Examples:

event_queue.empty?

Returns:

  • (Boolean)


89
90
91
# File 'lib/finite_machine/message_queue.rb', line 89

def empty?
  @mutex.synchronize { @queue.empty? }
end

#inspectObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



157
158
159
160
161
# File 'lib/finite_machine/message_queue.rb', line 157

def inspect
  @mutex.synchronize do
    "#<#{self.class}:#{object_id.to_s(16)} @size=#{size}, @dead=#{@dead}>"
  end
end

#join(timeout = nil) ⇒ nil, Thread

Join the event queue from current thread

Examples:

event_queue.join

Parameters:

  • timeout (Fixnum) (defaults to: nil)

Returns:

  • (nil, Thread)


115
116
117
118
# File 'lib/finite_machine/message_queue.rb', line 115

def join(timeout = nil)
  return unless @thread
  timeout.nil? ? @thread.join : @thread.join(timeout)
end

#running?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


47
48
49
# File 'lib/finite_machine/message_queue.rb', line 47

def running?
  !@thread.nil? && alive?
end

#shutdownBoolean

Shut down this event queue and clean it up

Examples:

event_queue.shutdown

Returns:

  • (Boolean)


128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/finite_machine/message_queue.rb', line 128

def shutdown
  fail EventQueueDeadError, 'event queue already dead' if @dead

  queue = []
  @mutex.synchronize do
    @dead = true
    @not_empty.broadcast

    queue = @queue
    @queue.clear
  end
  while !queue.empty?
    discard_message(queue.pop)
  end
  true
end

#sizeInteger

Get number of events waiting for processing

Examples:

event_queue.size

Returns:

  • (Integer)


153
154
155
# File 'lib/finite_machine/message_queue.rb', line 153

def size
  @mutex.synchronize { @queue.size }
end

#spawn_threadObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Spawn new background thread



40
41
42
43
44
45
# File 'lib/finite_machine/message_queue.rb', line 40

def spawn_thread
  @thread = Thread.new do
    Thread.current.abort_on_exception = true
    process_events
  end
end

#startObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Start a new thread with a queue of callback events to run



32
33
34
35
# File 'lib/finite_machine/message_queue.rb', line 32

def start
  return if running?
  @mutex.synchronize { spawn_thread }
end

#subscribe(*args, &block) ⇒ Object

Add listener to the queue to receive messages



75
76
77
78
79
80
81
# File 'lib/finite_machine/message_queue.rb', line 75

def subscribe(*args, &block)
  @mutex.synchronize do
    listener = Listener.new(*args)
    listener.on_delivery(&block)
    @listeners << listener
  end
end