Class: ZMQMachine::Reactor
- Inherits:
-
Object
- Object
- ZMQMachine::Reactor
- Defined in:
- lib/zm/reactor.rb
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#exception_handler ⇒ Object
readonly
Returns the value of attribute exception_handler.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
-
#cancel_timer(timer) ⇒ Object
Cancels an existing timer if it hasn’t already fired.
-
#close_socket(sock) ⇒ Object
Removes the given
sock
socket from the reactor context. -
#deregister_readable(sock) ⇒ Object
Deregisters the
sock
for POLLIN events. -
#deregister_writable(sock) ⇒ Object
Deregisters the
sock
for POLLOUT. -
#initialize(configuration = nil) ⇒ Reactor
constructor
Takes a ZMQ::Configuration instance to initialize itself.
-
#join(delay = nil) ⇒ Object
Join on the thread running this reactor instance.
-
#kill ⇒ Object
Kills the running reactor instance by terminating its thread.
- #list_timers ⇒ Object
-
#log(level, message) ⇒ Object
Publishes log messages to an existing transport passed in to the Reactor constructor using the :log_endpoint key.
-
#next_tick(blk = nil, &block) ⇒ Object
Schedules a proc or block to execute on the next trip through the reactor loop.
-
#oneshot_timer(delay, timer_proc = nil, &blk) ⇒ Object
Creates a timer that will fire a single time.
-
#oneshot_timer_at(exact_time, timer_proc = nil, &blk) ⇒ Object
Creates a timer that will fire once at a specific time as returned by ZM::Timers.now_converted.
- #open_socket_count(kind = :all) ⇒ Object
-
#pair_socket(handler_instance) ⇒ Object
Creates a PAIR socket and attaches
handler_instance
to the resulting socket. -
#periodical_timer(delay, timer_proc = nil, &blk) ⇒ Object
Creates a timer that will fire every
delay
milliseconds until it is explicitly cancelled. -
#pub_socket(handler_instance) ⇒ Object
Creates a PUB socket and attaches
handler_instance
to the resulting socket. -
#pull_socket(handler_instance) ⇒ Object
Creates a PULL socket and attaches
handler_instance
to the resulting socket. -
#push_socket(handler_instance) ⇒ Object
Creates a PUSH socket and attaches
handler_instance
to the resulting socket. -
#register_readable(sock) ⇒ Object
Registers the
sock
for POLLIN events that will cause the reactor to call the handler’s on_readable method. -
#register_writable(sock) ⇒ Object
Registers the
sock
for POLLOUT events that will cause the reactor to call the handler’s on_writable method. -
#rep_socket(handler_instance) ⇒ Object
Creates a REP socket and attaches
handler_instance
to the resulting socket. -
#req_socket(handler_instance) ⇒ Object
Creates a REQ socket and attaches
handler_instance
to the resulting socket. -
#reschedule_timers ⇒ Object
Asks all timers to reschedule themselves starting from Timers.now.
-
#run(blk = nil, &block) ⇒ Object
The main entry point for all new reactor contexts.
-
#running? ⇒ Boolean
Returns true when the reactor is running OR while it is in the midst of a shutdown request.
- #shared_context? ⇒ Boolean
-
#stop(delay = nil) ⇒ Object
Marks the reactor as eligible for termination.
-
#sub_socket(handler_instance) ⇒ Object
Creates a SUB socket and attaches
handler_instance
to the resulting socket. -
#xrep_socket(handler_instance) ⇒ Object
(also: #router_socket)
Creates a XREP socket and attaches
handler_instance
to the resulting socket. -
#xreq_socket(handler_instance) ⇒ Object
(also: #dealer_socket)
Creates a XREQ socket and attaches
handler_instance
to the resulting socket.
Constructor Details
#initialize(configuration = nil) ⇒ Reactor
Takes a ZMQ::Configuration instance to initialize itself.
name
provides a name for this reactor instance. It’s unused at present but may be used in the future for allowing multiple reactors to communicate amongst each other. Defaults to ‘unnamed’ if it isn’t set.
poll_interval
is the number of milliseconds to block while waiting for new 0mq socket events; default is 10 ms.
context
should be a 0mq context as created by ZMQ::Context.new. The purpose of providing a context to the reactor is so that multiple reactors can share a single context. Doing so allows for sockets within each reactor to communicate with each other via an :inproc transport (:inproc is misnamed, it should be :incontext). By not supplying this hash, the reactor will create and use its own 0mq context. Default is nil.
log_endpoint
is a a transport string for an endpoint that a logger client may connect to for publishing log messages. when this key is defined, the client is automatically created and connected to the indicated endpoint. Default is nil.
exception_handler
is called for all exceptions. The handler should respond to #call and take a single argument. Default is to just raise the exception and exit.
config = ZM::Configuration.new
config.context = master_context
config.log_endpoint = endpoint
config.name = :test_rig
config.poll_interval = 100 # defaults to 10 if unset
Reactor.new(config).run do |reactor|
reactor.oneshot_timer(50) { print("At least 50ms have elapsed\n")}
end
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/zm/reactor.rb', line 80 def initialize configuration = nil configuration ||= Configuration.new @name = configuration.name || 'unnamed' @running = false @thread = nil @poll_interval = determine_interval(configuration.poll_interval || 10) @proc_queue = [] @proc_queue_mutex = Mutex.new # could raise if it fails to allocate a Context @context = if configuration.context @shared_context = true configuration.context else @shared_context = false ZMQ::Context.new end @poller = ZMQ::Poller.new @sockets = [] @raw_to_socket = {} Thread.abort_on_exception = true if configuration.log_endpoint @logger = LogClient.new self, configuration.log_endpoint @logging_enabled = true end @exception_handler = configuration.exception_handler if configuration.exception_handler @timers = ZMQMachine::Timers.new(@exception_handler) end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
40 41 42 |
# File 'lib/zm/reactor.rb', line 40 def context @context end |
#exception_handler ⇒ Object (readonly)
Returns the value of attribute exception_handler.
40 41 42 |
# File 'lib/zm/reactor.rb', line 40 def exception_handler @exception_handler end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
40 41 42 |
# File 'lib/zm/reactor.rb', line 40 def logger @logger end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
40 41 42 |
# File 'lib/zm/reactor.rb', line 40 def name @name end |
Instance Method Details
#cancel_timer(timer) ⇒ Object
Cancels an existing timer if it hasn’t already fired.
Returns true if cancelled, false if otherwise.
422 423 424 |
# File 'lib/zm/reactor.rb', line 422 def cancel_timer timer @timers.cancel timer end |
#close_socket(sock) ⇒ Object
Removes the given sock
socket from the reactor context. It is deregistered for new events and closed. Any queued messages are silently dropped.
Returns true
for a succesful close, false
otherwise.
212 213 214 215 216 217 218 219 |
# File 'lib/zm/reactor.rb', line 212 def close_socket sock return false unless sock removed = delete_socket sock sock.raw_socket.close removed end |
#deregister_readable(sock) ⇒ Object
Deregisters the sock
for POLLIN events. The handler will no longer receive calls to on_readable.
381 382 383 |
# File 'lib/zm/reactor.rb', line 381 def deregister_readable sock @poller.deregister_readable sock.raw_socket end |
#deregister_writable(sock) ⇒ Object
Deregisters the sock
for POLLOUT. The handler will no longer receive calls to on_writable.
367 368 369 |
# File 'lib/zm/reactor.rb', line 367 def deregister_writable sock @poller.deregister_writable sock.raw_socket end |
#join(delay = nil) ⇒ Object
Join on the thread running this reactor instance. Default behavior is to wait indefinitely for the thread to exit.
Pass an optional delay
value measured in milliseconds; the thread will be stopped if it hasn’t exited by the end of delay
milliseconds.
Returns immediately when the thread has already exited.
168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/zm/reactor.rb', line 168 def join delay = nil # don't allow the thread to try and join itself and only worry about # joining for live threads if running? && @thread.alive? && @thread != Thread.current if delay # convert to seconds to meet the argument expectations of Thread#join seconds = delay / 1000.0 @thread.join seconds else @thread.join end end end |
#kill ⇒ Object
Kills the running reactor instance by terminating its thread.
After the thread exits, the reactor attempts to clean up after itself and kill any pending I/O.
187 188 189 190 191 192 193 |
# File 'lib/zm/reactor.rb', line 187 def kill if running? cleanup @stopping = true @thread.kill end end |
#list_timers ⇒ Object
435 436 437 438 439 440 |
# File 'lib/zm/reactor.rb', line 435 def list_timers @timers.list.each do |timer| name = timer.respond_to?(:name) ? timer.timer_proc.name : timer.timer_proc.to_s puts "fire time [#{Time.at(timer.fire_time / 1000)}], method [#{name}]" end end |
#log(level, message) ⇒ Object
Publishes log messages to an existing transport passed in to the Reactor constructor using the :log_endpoint key.
Reactor.new :log_endpoint => 'inproc://reactor_log'
level
parameter refers to a key to indicate severity level, e.g. :warn, :debug, level0, level9, etc.
message
is a plain string that will be written out in its entirety.
When no :log_endpoint was defined when creating the Reactor, all calls just discard the messages.
reactor.log(:info, "some message")
This produces output that looks like:
info|20110526-10:23:47.768796 CDT|some message
470 471 472 473 474 |
# File 'lib/zm/reactor.rb', line 470 def log level, if @logging_enabled @logger.write level, end end |
#next_tick(blk = nil, &block) ⇒ Object
Schedules a proc or block to execute on the next trip through the reactor loop.
This method is thread-safe.
200 201 202 203 204 205 |
# File 'lib/zm/reactor.rb', line 200 def next_tick blk = nil, &block blk ||= block @proc_queue_mutex.synchronize do @proc_queue << blk end end |
#oneshot_timer(delay, timer_proc = nil, &blk) ⇒ Object
Creates a timer that will fire a single time. Expects either a timer_proc
proc or a block, otherwise no timer is created.
delay
is measured in milliseconds (1 second equals 1000 milliseconds)
391 392 393 394 |
# File 'lib/zm/reactor.rb', line 391 def oneshot_timer delay, timer_proc = nil, &blk blk ||= timer_proc @timers.add_oneshot delay, blk end |
#oneshot_timer_at(exact_time, timer_proc = nil, &blk) ⇒ Object
Creates a timer that will fire once at a specific time as returned by ZM::Timers.now_converted.
exact_time
may be either a Time object or a Numeric.
401 402 403 404 |
# File 'lib/zm/reactor.rb', line 401 def oneshot_timer_at exact_time, timer_proc = nil, &blk blk ||= timer_proc @timers.add_oneshot_at exact_time, blk end |
#open_socket_count(kind = :all) ⇒ Object
442 443 444 445 446 447 448 449 450 |
# File 'lib/zm/reactor.rb', line 442 def open_socket_count kind = :all @sockets.inject(0) do |sum, socket| if :all == kind || (socket.kind == kind) sum + 1 else sum end end end |
#pair_socket(handler_instance) ⇒ Object
Creates a PAIR socket and attaches handler_instance
to the resulting socket. Works only with other #pair_socket instances in the same or other reactor instance.
handler_instance
must implement the #on_readable and #on_readable_error methods. Each handler must also implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
293 294 295 |
# File 'lib/zm/reactor.rb', line 293 def pair_socket handler_instance create_socket handler_instance, ZMQMachine::Socket::Pair end |
#periodical_timer(delay, timer_proc = nil, &blk) ⇒ Object
Creates a timer that will fire every delay
milliseconds until it is explicitly cancelled. Expects either a timer_proc
proc or a block, otherwise no timer is created.
delay
is measured in milliseconds (1 second equals 1000 milliseconds)
413 414 415 416 |
# File 'lib/zm/reactor.rb', line 413 def periodical_timer delay, timer_proc = nil, &blk blk ||= timer_proc @timers.add_periodical delay, blk end |
#pub_socket(handler_instance) ⇒ Object
Creates a PUB socket and attaches handler_instance
to the resulting socket. Usually paired with one or more #sub_socket instances in the same or other reactor instance.
handler_instance
must implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events. This socket type can only write; it can never receive/read messages.
All handlers must implement the #on_attach method.
308 309 310 |
# File 'lib/zm/reactor.rb', line 308 def pub_socket handler_instance create_socket handler_instance, ZMQMachine::Socket::Pub end |
#pull_socket(handler_instance) ⇒ Object
Creates a PULL socket and attaches handler_instance
to the resulting socket. Usually paired with one or more
#push_socket in the same or different reactor context.
handler_instance
must implement the #on_readable and #on_readable_error methods. The reactor will call those methods based upon new events. This socket type can only read; it can never write/send messages.
All handlers must implement the #on_attach method.
353 354 355 |
# File 'lib/zm/reactor.rb', line 353 def pull_socket handler_instance create_socket handler_instance, ZMQMachine::Socket::Pull end |
#push_socket(handler_instance) ⇒ Object
Creates a PUSH socket and attaches handler_instance
to the resulting socket. Usually paired with one or more
#pull_socket in the same or different reactor context.
handler_instance
must implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events. This socket type can only write; it can never recv messages.
All handlers must implement the #on_attach method.
338 339 340 |
# File 'lib/zm/reactor.rb', line 338 def push_socket handler_instance create_socket handler_instance, ZMQMachine::Socket::Push end |
#register_readable(sock) ⇒ Object
Registers the sock
for POLLIN events that will cause the reactor to call the handler’s on_readable method.
374 375 376 |
# File 'lib/zm/reactor.rb', line 374 def register_readable sock @poller.register_readable sock.raw_socket end |
#register_writable(sock) ⇒ Object
Registers the sock
for POLLOUT events that will cause the reactor to call the handler’s on_writable method.
360 361 362 |
# File 'lib/zm/reactor.rb', line 360 def register_writable sock @poller.register_writable sock.raw_socket end |
#rep_socket(handler_instance) ⇒ Object
Creates a REP socket and attaches handler_instance
to the resulting socket. Should only be paired with one other #req_socket instance.
handler_instance
must implement the #on_readable and #on_readable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
245 246 247 |
# File 'lib/zm/reactor.rb', line 245 def rep_socket handler_instance create_socket handler_instance, ZMQMachine::Socket::Rep end |
#req_socket(handler_instance) ⇒ Object
Creates a REQ socket and attaches handler_instance
to the resulting socket. Should only be paired with one other #rep_socket instance.
handler_instance
must implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
231 232 233 |
# File 'lib/zm/reactor.rb', line 231 def req_socket handler_instance create_socket handler_instance, ZMQMachine::Socket::Req end |
#reschedule_timers ⇒ Object
Asks all timers to reschedule themselves starting from Timers.now. Typically called when the underlying time source for the ZM::Timers class has been replaced; existing timers may not fire as expected, so we ask them to reset themselves.
431 432 433 |
# File 'lib/zm/reactor.rb', line 431 def reschedule_timers @timers.reschedule end |
#run(blk = nil, &block) ⇒ Object
The main entry point for all new reactor contexts. This proc or block given to this method is evaluated once before entering the reactor loop. This evaluation generally sets up sockets and timers that will do the real work once the loop is executed.
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/zm/reactor.rb', line 130 def run blk = nil, &block blk ||= block @running, @stopping = true, false @thread = Thread.new do blk.call self if blk while !@stopping && running? do run_once end cleanup end self end |
#running? ⇒ Boolean
Returns true when the reactor is running OR while it is in the midst of a shutdown request.
Returns false when the reactor thread does not exist.
122 |
# File 'lib/zm/reactor.rb', line 122 def running?() @running; end |
#shared_context? ⇒ Boolean
113 114 115 |
# File 'lib/zm/reactor.rb', line 113 def shared_context? @shared_context end |
#stop(delay = nil) ⇒ Object
Marks the reactor as eligible for termination. Then waits for the reactor thread to exit via #join (optional timeout).
The reactor is not forcibly terminated if it is currently blocked by some long-running operation. Use #kill to forcibly terminate the reactor.
153 154 155 156 157 |
# File 'lib/zm/reactor.rb', line 153 def stop delay = nil # wait until the thread loops around again and exits on its own @stopping = true join delay end |
#sub_socket(handler_instance) ⇒ Object
Creates a SUB socket and attaches handler_instance
to the resulting socket. Usually paired with one or more
#pub_socket in the same or different reactor context.
handler_instance
must implement the #on_readable and #on_readable_error methods. The reactor will call those methods based upon new events. This socket type can only read; it can never write/send messages.
All handlers must implement the #on_attach method.
323 324 325 |
# File 'lib/zm/reactor.rb', line 323 def sub_socket handler_instance create_socket handler_instance, ZMQMachine::Socket::Sub end |
#xrep_socket(handler_instance) ⇒ Object Also known as: router_socket
Creates a XREP socket and attaches handler_instance
to the resulting socket. Should only be paired with one other #req_socket instance.
handler_instance
must implement the #on_readable, #on_readable_error, #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
276 277 278 |
# File 'lib/zm/reactor.rb', line 276 def xrep_socket handler_instance create_socket handler_instance, ZMQMachine::Socket::XRep end |
#xreq_socket(handler_instance) ⇒ Object Also known as: dealer_socket
Creates a XREQ socket and attaches handler_instance
to the resulting socket. Should only be paired with one other #rep_socket instance.
handler_instance
must implement the #on_readable, #on_readable_error, #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
260 261 262 |
# File 'lib/zm/reactor.rb', line 260 def xreq_socket handler_instance create_socket handler_instance, ZMQMachine::Socket::XReq end |