Class: ZMQMachine::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/zm/reactor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, poll_interval = 10) ⇒ Reactor

poll_interval is the number of milliseconds to block while waiting for new 0mq socket events; default is 10



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/zm/reactor.rb', line 45

def initialize name, poll_interval = 10
  @name = name
  @running = false
  @thread = nil
  @poll_interval = determine_interval poll_interval
  @timers = ZMQMachine::Timers.new

  @proc_queue = []
  @proc_queue_mutex = Mutex.new

  # could raise if it fails
  @context = ZMQ::Context.new 1
  @poller = ZMQ::Poller.new
  @sockets = []
  @raw_to_socket = {}
  Thread.abort_on_exception = true
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



40
41
42
# File 'lib/zm/reactor.rb', line 40

def name
  @name
end

Instance Method Details

#cancel_timer(timer) ⇒ Object

Cancels an existing timer if it hasn’t already fired.

Returns true if cancelled, false if otherwise.



334
335
336
# File 'lib/zm/reactor.rb', line 334

def cancel_timer timer
  @timers.cancel timer
end

#close_socket(sock) ⇒ Object

Removes the given sock socket from the reactor context. It is deregistered for new events and closed. Any queued messages are silently dropped.



156
157
158
159
# File 'lib/zm/reactor.rb', line 156

def close_socket sock
  delete_socket sock
  sock.raw_socket.close
end

#deregister_readable(sock) ⇒ Object

Deregisters the sock for POLLIN events. The handler will no longer receive calls to on_readable.



303
304
305
# File 'lib/zm/reactor.rb', line 303

def deregister_readable sock
  @poller.deregister_readable sock.raw_socket
end

#deregister_writable(sock) ⇒ Object

Deregisters the sock for POLLOUT. The handler will no longer receive calls to on_writable.



289
290
291
# File 'lib/zm/reactor.rb', line 289

def deregister_writable sock
  @poller.deregister_writable sock.raw_socket
end

#join(delay = nil) ⇒ Object

Join on the thread running this reactor instance. Default behavior is to wait indefinitely for the thread to exit.

Pass an optional delay value measured in milliseconds; the thread will be stopped if it hasn’t exited by the end of delay milliseconds.

Returns immediately when the thread has already exited.



114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/zm/reactor.rb', line 114

def join delay = nil
  # don't allow the thread to try and join itself and only worry about
  # joining for live threads
  if running? && @thread.alive? && @thread != Thread.current
    if delay
      # convert to seconds to meet the argument expectations of Thread#join
      seconds = delay / 1000.0
      @thread.join seconds
    else
      @thread.join
    end
  end
end

#killObject

Kills the running reactor instance by terminating its thread.

After the thread exits, the reactor attempts to clean up after itself and kill any pending I/O.



133
134
135
136
137
138
139
# File 'lib/zm/reactor.rb', line 133

def kill
  if running?
    @stopping = true
    @thread.kill
    cleanup
  end
end

#list_timersObject



347
348
349
350
351
352
# File 'lib/zm/reactor.rb', line 347

def list_timers
  @timers.list.each do |timer|
    name = timer.respond_to?(:name) ? timer.timer_proc.name : timer.timer_proc.to_s
    puts "fire time [#{Time.at(timer.fire_time / 1000)}], method [#{name}]"
  end
end

#next_tick(blk = nil, &block) ⇒ Object

Schedules a proc or block to execute on the next trip through the reactor loop.

This method is thread-safe.



146
147
148
149
150
151
# File 'lib/zm/reactor.rb', line 146

def next_tick blk = nil, &block
  blk ||= block
  @proc_queue_mutex.synchronize do
    @proc_queue << blk
  end
end

#oneshot_timer(delay, timer_proc = nil, &blk) ⇒ Object

Creates a timer that will fire a single time. Expects either a timer_proc proc or a block, otherwise no timer is created.

delay is measured in milliseconds (1 second equals 1000 milliseconds)



313
314
315
316
# File 'lib/zm/reactor.rb', line 313

def oneshot_timer delay, timer_proc = nil, &blk
  blk ||= timer_proc
  @timers.add_oneshot delay, blk
end

#pair_socket(handler_instance) ⇒ Object

Creates a PAIR socket and attaches handler_instance to the resulting socket. Works only with other #pair_socket instances in the same or other reactor instance.

handler_instance must implement the #on_readable and #on_readable_error methods. Each handler must also implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



239
240
241
242
243
# File 'lib/zm/reactor.rb', line 239

def pair_socket handler_instance
  sock = ZMQMachine::Socket::Pair.new @context, handler_instance
  save_socket sock
  sock
end

#periodical_timer(delay, timer_proc = nil, &blk) ⇒ Object

Creates a timer that will fire every delay milliseconds until it is explicitly cancelled. Expects either a timer_proc proc or a block, otherwise no timer is created.

delay is measured in milliseconds (1 second equals 1000 milliseconds)



325
326
327
328
# File 'lib/zm/reactor.rb', line 325

def periodical_timer delay, timer_proc = nil, &blk
  blk ||= timer_proc
  @timers.add_periodical delay, blk
end

#pub_socket(handler_instance) ⇒ Object

Creates a PUB socket and attaches handler_instance to the resulting socket. Usually paired with one or more #sub_socket instances in the same or other reactor instance.

handler_instance must implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events. This socket type can only write; it can never receive/read messages.

All handlers must implement the #on_attach method.



256
257
258
259
260
# File 'lib/zm/reactor.rb', line 256

def pub_socket handler_instance
  sock = ZMQMachine::Socket::Pub.new @context, handler_instance
  save_socket sock
  sock
end

#register_readable(sock) ⇒ Object

Registers the sock for POLLIN events that will cause the reactor to call the handler’s on_readable method.



296
297
298
# File 'lib/zm/reactor.rb', line 296

def register_readable sock
  @poller.register_readable sock.raw_socket
end

#register_writable(sock) ⇒ Object

Registers the sock for POLLOUT events that will cause the reactor to call the handler’s on_writable method.



282
283
284
# File 'lib/zm/reactor.rb', line 282

def register_writable sock
  @poller.register_writable sock.raw_socket
end

#rep_socket(handler_instance) ⇒ Object

Creates a REP socket and attaches handler_instance to the resulting socket. Should only be paired with one other #req_socket instance.

handler_instance must implement the #on_readable and #on_readable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



187
188
189
190
191
# File 'lib/zm/reactor.rb', line 187

def rep_socket handler_instance
  sock = ZMQMachine::Socket::Rep.new @context, handler_instance
  save_socket sock
  sock
end

#req_socket(handler_instance) ⇒ Object

Creates a REQ socket and attaches handler_instance to the resulting socket. Should only be paired with one other #rep_socket instance.

handler_instance must implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



171
172
173
174
175
# File 'lib/zm/reactor.rb', line 171

def req_socket handler_instance
  sock = ZMQMachine::Socket::Req.new @context, handler_instance
  save_socket sock
  sock
end

#reschedule_timersObject

Asks all timers to reschedule themselves starting from Timers.now. Typically called when the underlying time source for the ZM::Timers class has been replaced; existing timers may not fire as expected, so we ask them to reset themselves.



343
344
345
# File 'lib/zm/reactor.rb', line 343

def reschedule_timers
  @timers.reschedule
end

#run(blk = nil, &block) ⇒ Object

The main entry point for all new reactor contexts. This proc or block given to this method is evaluated once before entering the reactor loop. This evaluation generally sets up sockets and timers that will do the real work once the loop is executed.



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/zm/reactor.rb', line 76

def run blk = nil, &block
  blk ||= block
  @running, @stopping = true, false

  @thread = Thread.new do
    blk.call self if blk

    while !@stopping && running? do
      run_once
    end

    cleanup
  end
  self
end

#running?Boolean

Returns true when the reactor is running OR while it is in the midst of a shutdown request.

Returns false when the reactor thread does not exist.

Returns:

  • (Boolean)


68
# File 'lib/zm/reactor.rb', line 68

def running?() @running; end

#stopObject

Marks the reactor as eligible for termination. Then waits for the reactor thread to exit via #join (no timeout).

The reactor is not forcibly terminated if it is currently blocked by some long-running operation. Use #kill to forcibly terminate the reactor.



99
100
101
102
103
# File 'lib/zm/reactor.rb', line 99

def stop
  # wait until the thread loops around again and exits on its own
  @stopping = true
  join
end

#sub_socket(handler_instance) ⇒ Object

Creates a SUB socket and attaches handler_instance to the resulting socket. Usually paired with one or more

#pub_socket in the same or different reactor context.

handler_instance must implement the #on_readable and #on_readable_error methods. The reactor will call those methods based upon new events. This socket type can only read; it can never write/send messages.

All handlers must implement the #on_attach method.



273
274
275
276
277
# File 'lib/zm/reactor.rb', line 273

def sub_socket handler_instance
  sock = ZMQMachine::Socket::Sub.new @context, handler_instance
  save_socket sock
  sock
end

#xrep_socket(handler_instance) ⇒ Object

Creates a XREP socket and attaches handler_instance to the resulting socket. Should only be paired with one other #req_socket instance.

handler_instance must implement the #on_readable, #on_readable_error, #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



221
222
223
224
225
# File 'lib/zm/reactor.rb', line 221

def xrep_socket handler_instance
  sock = ZMQMachine::Socket::XRep.new @context, handler_instance
  save_socket sock
  sock
end

#xreq_socket(handler_instance) ⇒ Object

Creates a XREQ socket and attaches handler_instance to the resulting socket. Should only be paired with one other #rep_socket instance.

handler_instance must implement the #on_readable, #on_readable_error, #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



204
205
206
207
208
# File 'lib/zm/reactor.rb', line 204

def xreq_socket handler_instance
  sock = ZMQMachine::Socket::XReq.new @context, handler_instance
  save_socket sock
  sock
end