Class: ZMQMachine::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/zm/reactor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration = nil) ⇒ Reactor

Takes a ZMQ::Configuration instance to initialize itself.

name provides a name for this reactor instance. It’s unused at present but may be used in the future for allowing multiple reactors to communicate amongst each other. Defaults to ‘unnamed’ if it isn’t set.

poll_interval is the number of milliseconds to block while waiting for new 0mq socket events; default is 10 ms.

context should be a 0mq context as created by ZMQ::Context.new. The purpose of providing a context to the reactor is so that multiple reactors can share a single context. Doing so allows for sockets within each reactor to communicate with each other via an :inproc transport (:inproc is misnamed, it should be :incontext). By not supplying this hash, the reactor will create and use its own 0mq context. Default is nil.

log_endpoint is a a transport string for an endpoint that a logger client may connect to for publishing log messages. when this key is defined, the client is automatically created and connected to the indicated endpoint. Default is nil.

exception_handler is called for all exceptions. The handler should respond to #call and take a single argument. Default is to just raise the exception and exit.

config = ZM::Configuration.new
config.context = master_context
config.log_endpoint = endpoint
config.name = :test_rig
config.poll_interval = 100 # defaults to 10 if unset
Reactor.new(config).run do |reactor|
  reactor.oneshot_timer(50) { print("At least 50ms have elapsed\n")}
end


80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/zm/reactor.rb', line 80

def initialize configuration = nil
  configuration ||= Configuration.new
  @name = configuration.name || 'unnamed'
  @running = false
  @thread = nil
  @poll_interval = determine_interval(configuration.poll_interval || 10)

  @proc_queue = []
  @proc_queue_mutex = Mutex.new

  # could raise if it fails to allocate a Context
  @context = if configuration.context
    @shared_context = true
    configuration.context
  else
    @shared_context = false
    ZMQ::Context.new
  end

  @poller = ZMQ::Poller.new
  @sockets = []
  @raw_to_socket = {}
  Thread.abort_on_exception = true

  if configuration.log_endpoint
    @logger = LogClient.new self, configuration.log_endpoint
    @logging_enabled = true
  end

  @exception_handler = configuration.exception_handler if configuration.exception_handler
  @timers = ZMQMachine::Timers.new(@exception_handler)
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



40
41
42
# File 'lib/zm/reactor.rb', line 40

def context
  @context
end

#exception_handlerObject (readonly)

Returns the value of attribute exception_handler.



40
41
42
# File 'lib/zm/reactor.rb', line 40

def exception_handler
  @exception_handler
end

#loggerObject (readonly)

Returns the value of attribute logger.



40
41
42
# File 'lib/zm/reactor.rb', line 40

def logger
  @logger
end

#nameObject (readonly)

Returns the value of attribute name.



40
41
42
# File 'lib/zm/reactor.rb', line 40

def name
  @name
end

Instance Method Details

#cancel_timer(timer) ⇒ Object

Cancels an existing timer if it hasn’t already fired.

Returns true if cancelled, false if otherwise.



422
423
424
# File 'lib/zm/reactor.rb', line 422

def cancel_timer timer
  @timers.cancel timer
end

#close_socket(sock) ⇒ Object

Removes the given sock socket from the reactor context. It is deregistered for new events and closed. Any queued messages are silently dropped.

Returns true for a succesful close, false otherwise.



212
213
214
215
216
217
218
219
# File 'lib/zm/reactor.rb', line 212

def close_socket sock
  return false unless sock

  removed = delete_socket sock
  sock.raw_socket.close

  removed
end

#deregister_readable(sock) ⇒ Object

Deregisters the sock for POLLIN events. The handler will no longer receive calls to on_readable.



381
382
383
# File 'lib/zm/reactor.rb', line 381

def deregister_readable sock
  @poller.deregister_readable sock.raw_socket
end

#deregister_writable(sock) ⇒ Object

Deregisters the sock for POLLOUT. The handler will no longer receive calls to on_writable.



367
368
369
# File 'lib/zm/reactor.rb', line 367

def deregister_writable sock
  @poller.deregister_writable sock.raw_socket
end

#join(delay = nil) ⇒ Object

Join on the thread running this reactor instance. Default behavior is to wait indefinitely for the thread to exit.

Pass an optional delay value measured in milliseconds; the thread will be stopped if it hasn’t exited by the end of delay milliseconds.

Returns immediately when the thread has already exited.



168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/zm/reactor.rb', line 168

def join delay = nil
  # don't allow the thread to try and join itself and only worry about
  # joining for live threads
  if running? && @thread.alive? && @thread != Thread.current
    if delay
      # convert to seconds to meet the argument expectations of Thread#join
      seconds = delay / 1000.0
      @thread.join seconds
    else
      @thread.join
    end
  end
end

#killObject

Kills the running reactor instance by terminating its thread.

After the thread exits, the reactor attempts to clean up after itself and kill any pending I/O.



187
188
189
190
191
192
193
# File 'lib/zm/reactor.rb', line 187

def kill
  if running?
    cleanup
    @stopping = true
    @thread.kill
  end
end

#list_timersObject



435
436
437
438
439
440
# File 'lib/zm/reactor.rb', line 435

def list_timers
  @timers.list.each do |timer|
    name = timer.respond_to?(:name) ? timer.timer_proc.name : timer.timer_proc.to_s
    puts "fire time [#{Time.at(timer.fire_time / 1000)}], method [#{name}]"
  end
end

#log(level, message) ⇒ Object

Publishes log messages to an existing transport passed in to the Reactor constructor using the :log_endpoint key.

Reactor.new :log_endpoint => 'inproc://reactor_log'

level parameter refers to a key to indicate severity level, e.g. :warn, :debug, level0, level9, etc.

message is a plain string that will be written out in its entirety.

When no :log_endpoint was defined when creating the Reactor, all calls just discard the messages.

reactor.log(:info, "some message")

This produces output that looks like:

info|20110526-10:23:47.768796 CDT|some message


470
471
472
473
474
# File 'lib/zm/reactor.rb', line 470

def log level, message
  if @logging_enabled
    @logger.write level, message
  end
end

#next_tick(blk = nil, &block) ⇒ Object

Schedules a proc or block to execute on the next trip through the reactor loop.

This method is thread-safe.



200
201
202
203
204
205
# File 'lib/zm/reactor.rb', line 200

def next_tick blk = nil, &block
  blk ||= block
  @proc_queue_mutex.synchronize do
    @proc_queue << blk
  end
end

#oneshot_timer(delay, timer_proc = nil, &blk) ⇒ Object

Creates a timer that will fire a single time. Expects either a timer_proc proc or a block, otherwise no timer is created.

delay is measured in milliseconds (1 second equals 1000 milliseconds)



391
392
393
394
# File 'lib/zm/reactor.rb', line 391

def oneshot_timer delay, timer_proc = nil, &blk
  blk ||= timer_proc
  @timers.add_oneshot delay, blk
end

#oneshot_timer_at(exact_time, timer_proc = nil, &blk) ⇒ Object

Creates a timer that will fire once at a specific time as returned by ZM::Timers.now_converted.

exact_time may be either a Time object or a Numeric.



401
402
403
404
# File 'lib/zm/reactor.rb', line 401

def oneshot_timer_at exact_time, timer_proc = nil, &blk
  blk ||= timer_proc
  @timers.add_oneshot_at exact_time, blk
end

#open_socket_count(kind = :all) ⇒ Object



442
443
444
445
446
447
448
449
450
# File 'lib/zm/reactor.rb', line 442

def open_socket_count kind = :all
  @sockets.inject(0) do |sum, socket|
    if :all == kind || (socket.kind == kind)
      sum + 1
    else
      sum
    end
  end
end

#pair_socket(handler_instance) ⇒ Object

Creates a PAIR socket and attaches handler_instance to the resulting socket. Works only with other #pair_socket instances in the same or other reactor instance.

handler_instance must implement the #on_readable and #on_readable_error methods. Each handler must also implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



293
294
295
# File 'lib/zm/reactor.rb', line 293

def pair_socket handler_instance
  create_socket handler_instance, ZMQMachine::Socket::Pair
end

#periodical_timer(delay, timer_proc = nil, &blk) ⇒ Object

Creates a timer that will fire every delay milliseconds until it is explicitly cancelled. Expects either a timer_proc proc or a block, otherwise no timer is created.

delay is measured in milliseconds (1 second equals 1000 milliseconds)



413
414
415
416
# File 'lib/zm/reactor.rb', line 413

def periodical_timer delay, timer_proc = nil, &blk
  blk ||= timer_proc
  @timers.add_periodical delay, blk
end

#pub_socket(handler_instance) ⇒ Object

Creates a PUB socket and attaches handler_instance to the resulting socket. Usually paired with one or more #sub_socket instances in the same or other reactor instance.

handler_instance must implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events. This socket type can only write; it can never receive/read messages.

All handlers must implement the #on_attach method.



308
309
310
# File 'lib/zm/reactor.rb', line 308

def pub_socket handler_instance
  create_socket handler_instance, ZMQMachine::Socket::Pub
end

#pull_socket(handler_instance) ⇒ Object

Creates a PULL socket and attaches handler_instance to the resulting socket. Usually paired with one or more

#push_socket in the same or different reactor context.

handler_instance must implement the #on_readable and #on_readable_error methods. The reactor will call those methods based upon new events. This socket type can only read; it can never write/send messages.

All handlers must implement the #on_attach method.



353
354
355
# File 'lib/zm/reactor.rb', line 353

def pull_socket handler_instance
  create_socket handler_instance, ZMQMachine::Socket::Pull
end

#push_socket(handler_instance) ⇒ Object

Creates a PUSH socket and attaches handler_instance to the resulting socket. Usually paired with one or more

#pull_socket in the same or different reactor context.

handler_instance must implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events. This socket type can only write; it can never recv messages.

All handlers must implement the #on_attach method.



338
339
340
# File 'lib/zm/reactor.rb', line 338

def push_socket handler_instance
  create_socket handler_instance, ZMQMachine::Socket::Push
end

#register_readable(sock) ⇒ Object

Registers the sock for POLLIN events that will cause the reactor to call the handler’s on_readable method.



374
375
376
# File 'lib/zm/reactor.rb', line 374

def register_readable sock
  @poller.register_readable sock.raw_socket
end

#register_writable(sock) ⇒ Object

Registers the sock for POLLOUT events that will cause the reactor to call the handler’s on_writable method.



360
361
362
# File 'lib/zm/reactor.rb', line 360

def register_writable sock
  @poller.register_writable sock.raw_socket
end

#rep_socket(handler_instance) ⇒ Object

Creates a REP socket and attaches handler_instance to the resulting socket. Should only be paired with one other #req_socket instance.

handler_instance must implement the #on_readable and #on_readable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



245
246
247
# File 'lib/zm/reactor.rb', line 245

def rep_socket handler_instance
  create_socket handler_instance, ZMQMachine::Socket::Rep
end

#req_socket(handler_instance) ⇒ Object

Creates a REQ socket and attaches handler_instance to the resulting socket. Should only be paired with one other #rep_socket instance.

handler_instance must implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



231
232
233
# File 'lib/zm/reactor.rb', line 231

def req_socket handler_instance
  create_socket handler_instance, ZMQMachine::Socket::Req
end

#reschedule_timersObject

Asks all timers to reschedule themselves starting from Timers.now. Typically called when the underlying time source for the ZM::Timers class has been replaced; existing timers may not fire as expected, so we ask them to reset themselves.



431
432
433
# File 'lib/zm/reactor.rb', line 431

def reschedule_timers
  @timers.reschedule
end

#run(blk = nil, &block) ⇒ Object

The main entry point for all new reactor contexts. This proc or block given to this method is evaluated once before entering the reactor loop. This evaluation generally sets up sockets and timers that will do the real work once the loop is executed.



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/zm/reactor.rb', line 130

def run blk = nil, &block
  blk ||= block
  @running, @stopping = true, false

  @thread = Thread.new do
    blk.call self if blk

    while !@stopping && running? do
      run_once
    end

    cleanup
  end
  self
end

#running?Boolean

Returns true when the reactor is running OR while it is in the midst of a shutdown request.

Returns false when the reactor thread does not exist.

Returns:

  • (Boolean)


122
# File 'lib/zm/reactor.rb', line 122

def running?() @running; end

#shared_context?Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/zm/reactor.rb', line 113

def shared_context?
  @shared_context
end

#stop(delay = nil) ⇒ Object

Marks the reactor as eligible for termination. Then waits for the reactor thread to exit via #join (optional timeout).

The reactor is not forcibly terminated if it is currently blocked by some long-running operation. Use #kill to forcibly terminate the reactor.



153
154
155
156
157
# File 'lib/zm/reactor.rb', line 153

def stop delay = nil
  # wait until the thread loops around again and exits on its own
  @stopping = true
  join delay
end

#sub_socket(handler_instance) ⇒ Object

Creates a SUB socket and attaches handler_instance to the resulting socket. Usually paired with one or more

#pub_socket in the same or different reactor context.

handler_instance must implement the #on_readable and #on_readable_error methods. The reactor will call those methods based upon new events. This socket type can only read; it can never write/send messages.

All handlers must implement the #on_attach method.



323
324
325
# File 'lib/zm/reactor.rb', line 323

def sub_socket handler_instance
  create_socket handler_instance, ZMQMachine::Socket::Sub
end

#xrep_socket(handler_instance) ⇒ Object Also known as: router_socket

Creates a XREP socket and attaches handler_instance to the resulting socket. Should only be paired with one other #req_socket instance.

handler_instance must implement the #on_readable, #on_readable_error, #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



276
277
278
# File 'lib/zm/reactor.rb', line 276

def xrep_socket handler_instance
  create_socket handler_instance, ZMQMachine::Socket::XRep
end

#xreq_socket(handler_instance) ⇒ Object Also known as: dealer_socket

Creates a XREQ socket and attaches handler_instance to the resulting socket. Should only be paired with one other #rep_socket instance.

handler_instance must implement the #on_readable, #on_readable_error, #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



260
261
262
# File 'lib/zm/reactor.rb', line 260

def xreq_socket handler_instance
  create_socket handler_instance, ZMQMachine::Socket::XReq
end