Class: ZMQMachine::Reactor
- Inherits:
-
Object
- Object
- ZMQMachine::Reactor
- Defined in:
- lib/zm/reactor.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
-
#cancel_timer(timer) ⇒ Object
Cancels an existing timer if it hasn’t already fired.
-
#close_socket(sock) ⇒ Object
Removes the given
sock
socket from the reactor context. -
#deregister_readable(sock) ⇒ Object
Deregisters the
sock
for POLLIN events. -
#deregister_writable(sock) ⇒ Object
Deregisters the
sock
for POLLOUT. -
#initialize(name, poll_interval = 10) ⇒ Reactor
constructor
poll_interval
is the number of milliseconds to block while waiting for new 0mq socket events; default is 10. -
#join(delay = nil) ⇒ Object
Join on the thread running this reactor instance.
-
#kill ⇒ Object
Kills the running reactor instance by terminating its thread.
- #list_timers ⇒ Object
-
#next_tick(blk = nil, &block) ⇒ Object
Schedules a proc or block to execute on the next trip through the reactor loop.
-
#oneshot_timer(delay, timer_proc = nil, &blk) ⇒ Object
Creates a timer that will fire a single time.
-
#pair_socket(handler_instance) ⇒ Object
Creates a PAIR socket and attaches
handler_instance
to the resulting socket. -
#periodical_timer(delay, timer_proc = nil, &blk) ⇒ Object
Creates a timer that will fire every
delay
milliseconds until it is explicitly cancelled. -
#pub_socket(handler_instance) ⇒ Object
Creates a PUB socket and attaches
handler_instance
to the resulting socket. -
#register_readable(sock) ⇒ Object
Registers the
sock
for POLLIN events that will cause the reactor to call the handler’s on_readable method. -
#register_writable(sock) ⇒ Object
Registers the
sock
for POLLOUT events that will cause the reactor to call the handler’s on_writable method. -
#rep_socket(handler_instance) ⇒ Object
Creates a REP socket and attaches
handler_instance
to the resulting socket. -
#req_socket(handler_instance) ⇒ Object
Creates a REQ socket and attaches
handler_instance
to the resulting socket. -
#reschedule_timers ⇒ Object
Asks all timers to reschedule themselves starting from Timers.now.
-
#run(blk = nil, &block) ⇒ Object
The main entry point for all new reactor contexts.
-
#running? ⇒ Boolean
Returns true when the reactor is running OR while it is in the midst of a shutdown request.
-
#stop ⇒ Object
Marks the reactor as eligible for termination.
-
#sub_socket(handler_instance) ⇒ Object
Creates a SUB socket and attaches
handler_instance
to the resulting socket. -
#xrep_socket(handler_instance) ⇒ Object
Creates a XREP socket and attaches
handler_instance
to the resulting socket. -
#xreq_socket(handler_instance) ⇒ Object
Creates a XREQ socket and attaches
handler_instance
to the resulting socket.
Constructor Details
#initialize(name, poll_interval = 10) ⇒ Reactor
poll_interval
is the number of milliseconds to block while waiting for new 0mq socket events; default is 10
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/zm/reactor.rb', line 45 def initialize name, poll_interval = 10 @name = name @running = false @thread = nil @poll_interval = determine_interval poll_interval @timers = ZMQMachine::Timers.new @proc_queue = [] @proc_queue_mutex = Mutex.new # could raise if it fails @context = ZMQ::Context.new 1 @poller = ZMQ::Poller.new @sockets = [] @raw_to_socket = {} Thread.abort_on_exception = true end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
40 41 42 |
# File 'lib/zm/reactor.rb', line 40 def name @name end |
Instance Method Details
#cancel_timer(timer) ⇒ Object
Cancels an existing timer if it hasn’t already fired.
Returns true if cancelled, false if otherwise.
334 335 336 |
# File 'lib/zm/reactor.rb', line 334 def cancel_timer timer @timers.cancel timer end |
#close_socket(sock) ⇒ Object
Removes the given sock
socket from the reactor context. It is deregistered for new events and closed. Any queued messages are silently dropped.
156 157 158 159 |
# File 'lib/zm/reactor.rb', line 156 def close_socket sock delete_socket sock sock.raw_socket.close end |
#deregister_readable(sock) ⇒ Object
Deregisters the sock
for POLLIN events. The handler will no longer receive calls to on_readable.
303 304 305 |
# File 'lib/zm/reactor.rb', line 303 def deregister_readable sock @poller.deregister_readable sock.raw_socket end |
#deregister_writable(sock) ⇒ Object
Deregisters the sock
for POLLOUT. The handler will no longer receive calls to on_writable.
289 290 291 |
# File 'lib/zm/reactor.rb', line 289 def deregister_writable sock @poller.deregister_writable sock.raw_socket end |
#join(delay = nil) ⇒ Object
Join on the thread running this reactor instance. Default behavior is to wait indefinitely for the thread to exit.
Pass an optional delay
value measured in milliseconds; the thread will be stopped if it hasn’t exited by the end of delay
milliseconds.
Returns immediately when the thread has already exited.
114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/zm/reactor.rb', line 114 def join delay = nil # don't allow the thread to try and join itself and only worry about # joining for live threads if running? && @thread.alive? && @thread != Thread.current if delay # convert to seconds to meet the argument expectations of Thread#join seconds = delay / 1000.0 @thread.join seconds else @thread.join end end end |
#kill ⇒ Object
Kills the running reactor instance by terminating its thread.
After the thread exits, the reactor attempts to clean up after itself and kill any pending I/O.
133 134 135 136 137 138 139 |
# File 'lib/zm/reactor.rb', line 133 def kill if running? @stopping = true @thread.kill cleanup end end |
#list_timers ⇒ Object
347 348 349 350 351 352 |
# File 'lib/zm/reactor.rb', line 347 def list_timers @timers.list.each do |timer| name = timer.respond_to?(:name) ? timer.timer_proc.name : timer.timer_proc.to_s puts "fire time [#{Time.at(timer.fire_time / 1000)}], method [#{name}]" end end |
#next_tick(blk = nil, &block) ⇒ Object
Schedules a proc or block to execute on the next trip through the reactor loop.
This method is thread-safe.
146 147 148 149 150 151 |
# File 'lib/zm/reactor.rb', line 146 def next_tick blk = nil, &block blk ||= block @proc_queue_mutex.synchronize do @proc_queue << blk end end |
#oneshot_timer(delay, timer_proc = nil, &blk) ⇒ Object
Creates a timer that will fire a single time. Expects either a timer_proc
proc or a block, otherwise no timer is created.
delay
is measured in milliseconds (1 second equals 1000 milliseconds)
313 314 315 316 |
# File 'lib/zm/reactor.rb', line 313 def oneshot_timer delay, timer_proc = nil, &blk blk ||= timer_proc @timers.add_oneshot delay, blk end |
#pair_socket(handler_instance) ⇒ Object
Creates a PAIR socket and attaches handler_instance
to the resulting socket. Works only with other #pair_socket instances in the same or other reactor instance.
handler_instance
must implement the #on_readable and #on_readable_error methods. Each handler must also implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
239 240 241 242 243 |
# File 'lib/zm/reactor.rb', line 239 def pair_socket handler_instance sock = ZMQMachine::Socket::Pair.new @context, handler_instance save_socket sock sock end |
#periodical_timer(delay, timer_proc = nil, &blk) ⇒ Object
Creates a timer that will fire every delay
milliseconds until it is explicitly cancelled. Expects either a timer_proc
proc or a block, otherwise no timer is created.
delay
is measured in milliseconds (1 second equals 1000 milliseconds)
325 326 327 328 |
# File 'lib/zm/reactor.rb', line 325 def periodical_timer delay, timer_proc = nil, &blk blk ||= timer_proc @timers.add_periodical delay, blk end |
#pub_socket(handler_instance) ⇒ Object
Creates a PUB socket and attaches handler_instance
to the resulting socket. Usually paired with one or more #sub_socket instances in the same or other reactor instance.
handler_instance
must implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events. This socket type can only write; it can never receive/read messages.
All handlers must implement the #on_attach method.
256 257 258 259 260 |
# File 'lib/zm/reactor.rb', line 256 def pub_socket handler_instance sock = ZMQMachine::Socket::Pub.new @context, handler_instance save_socket sock sock end |
#register_readable(sock) ⇒ Object
Registers the sock
for POLLIN events that will cause the reactor to call the handler’s on_readable method.
296 297 298 |
# File 'lib/zm/reactor.rb', line 296 def register_readable sock @poller.register_readable sock.raw_socket end |
#register_writable(sock) ⇒ Object
Registers the sock
for POLLOUT events that will cause the reactor to call the handler’s on_writable method.
282 283 284 |
# File 'lib/zm/reactor.rb', line 282 def register_writable sock @poller.register_writable sock.raw_socket end |
#rep_socket(handler_instance) ⇒ Object
Creates a REP socket and attaches handler_instance
to the resulting socket. Should only be paired with one other #req_socket instance.
handler_instance
must implement the #on_readable and #on_readable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
187 188 189 190 191 |
# File 'lib/zm/reactor.rb', line 187 def rep_socket handler_instance sock = ZMQMachine::Socket::Rep.new @context, handler_instance save_socket sock sock end |
#req_socket(handler_instance) ⇒ Object
Creates a REQ socket and attaches handler_instance
to the resulting socket. Should only be paired with one other #rep_socket instance.
handler_instance
must implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
171 172 173 174 175 |
# File 'lib/zm/reactor.rb', line 171 def req_socket handler_instance sock = ZMQMachine::Socket::Req.new @context, handler_instance save_socket sock sock end |
#reschedule_timers ⇒ Object
Asks all timers to reschedule themselves starting from Timers.now. Typically called when the underlying time source for the ZM::Timers class has been replaced; existing timers may not fire as expected, so we ask them to reset themselves.
343 344 345 |
# File 'lib/zm/reactor.rb', line 343 def reschedule_timers @timers.reschedule end |
#run(blk = nil, &block) ⇒ Object
The main entry point for all new reactor contexts. This proc or block given to this method is evaluated once before entering the reactor loop. This evaluation generally sets up sockets and timers that will do the real work once the loop is executed.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/zm/reactor.rb', line 76 def run blk = nil, &block blk ||= block @running, @stopping = true, false @thread = Thread.new do blk.call self if blk while !@stopping && running? do run_once end cleanup end self end |
#running? ⇒ Boolean
Returns true when the reactor is running OR while it is in the midst of a shutdown request.
Returns false when the reactor thread does not exist.
68 |
# File 'lib/zm/reactor.rb', line 68 def running?() @running; end |
#stop ⇒ Object
Marks the reactor as eligible for termination. Then waits for the reactor thread to exit via #join (no timeout).
The reactor is not forcibly terminated if it is currently blocked by some long-running operation. Use #kill to forcibly terminate the reactor.
99 100 101 102 103 |
# File 'lib/zm/reactor.rb', line 99 def stop # wait until the thread loops around again and exits on its own @stopping = true join end |
#sub_socket(handler_instance) ⇒ Object
Creates a SUB socket and attaches handler_instance
to the resulting socket. Usually paired with one or more
#pub_socket in the same or different reactor context.
handler_instance
must implement the #on_readable and #on_readable_error methods. The reactor will call those methods based upon new events. This socket type can only read; it can never write/send messages.
All handlers must implement the #on_attach method.
273 274 275 276 277 |
# File 'lib/zm/reactor.rb', line 273 def sub_socket handler_instance sock = ZMQMachine::Socket::Sub.new @context, handler_instance save_socket sock sock end |
#xrep_socket(handler_instance) ⇒ Object
Creates a XREP socket and attaches handler_instance
to the resulting socket. Should only be paired with one other #req_socket instance.
handler_instance
must implement the #on_readable, #on_readable_error, #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
221 222 223 224 225 |
# File 'lib/zm/reactor.rb', line 221 def xrep_socket handler_instance sock = ZMQMachine::Socket::XRep.new @context, handler_instance save_socket sock sock end |
#xreq_socket(handler_instance) ⇒ Object
Creates a XREQ socket and attaches handler_instance
to the resulting socket. Should only be paired with one other #rep_socket instance.
handler_instance
must implement the #on_readable, #on_readable_error, #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
204 205 206 207 208 |
# File 'lib/zm/reactor.rb', line 204 def xreq_socket handler_instance sock = ZMQMachine::Socket::XReq.new @context, handler_instance save_socket sock sock end |