Class: ZMQ::PollInterruptible
- Defined in:
- lib/0mq/poll_interruptible.rb
Overview
An interruptible version of Poll.
Instance Attribute Summary
Attributes inherited from Poll
Instance Method Summary collapse
-
#close ⇒ Object
Permanently kill the Poll object This should be run once, when the Poll object is no longer needed.
-
#dead? ⇒ Boolean
Return true if the object has been killed or closed and cannot be run.
-
#initialize(*sockets) ⇒ PollInterruptible
constructor
Creates the additional interruption objects and calls super Note that either #kill or #close MUST be called when done with the object.
-
#interrupt(opts = {}, &block) ⇒ Object
Interrupt the running poll loop, but do not clean up.
-
#kill ⇒ Object
Interrupt the running poll loop and permanently kill the Poll object This should be run once, when the Poll object is no longer needed.
-
#run(&block) ⇒ Object
Same as Poll#run, but will yield [nil, nil] to the block if interrupted.
Methods inherited from Poll
poll, poll_nonblock, #run_nonblock
Constructor Details
#initialize(*sockets) ⇒ PollInterruptible
Creates the additional interruption objects and calls super Note that either #kill or #close MUST be called when done with the object. There is no automatic finalizer for this object.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/0mq/poll_interruptible.rb', line 10 def initialize(*sockets) @int_sock_rep = ZMQ::Socket.new ZMQ::REP @int_sock_req = ZMQ::Socket.new ZMQ::REQ @int_sock_push = ZMQ::Socket.new ZMQ::PUSH @int_sock_pull = ZMQ::Socket.new ZMQ::PULL # Choose an endpoint name that we can expect to be unique # so that they can be shared within the DefaultContext int_endpoint = "inproc://__PollInterruptible_int_"+hash.to_s(26) @int_sock_rep.bind int_endpoint+'R' @int_sock_req.connect int_endpoint+'R' @int_lock_req = Mutex.new @int_sock_pull.bind int_endpoint+'P' @int_sock_push.connect int_endpoint+'P' @int_lock_push = Mutex.new # Interruption blocks are stored here by key until #run receives them. # After each is run, the return value is stored here in its place. @interruptions = {} @dead = false super @int_sock_rep, @int_sock_pull, *sockets end |
Instance Method Details
#close ⇒ Object
Permanently kill the Poll object This should be run once, when the Poll object is no longer needed. This should only be accessed when there is no poll thread running. Use #kill instead when there is a poll loop thread running.
126 127 128 129 130 131 132 133 134 135 |
# File 'lib/0mq/poll_interruptible.rb', line 126 def close return nil if @dead @int_sock_rep.close @int_sock_req.close @int_sock_pull.close @int_sock_push.close @dead = true end |
#dead? ⇒ Boolean
Return true if the object has been killed or closed and cannot be run
138 139 140 |
# File 'lib/0mq/poll_interruptible.rb', line 138 def dead? @dead end |
#interrupt(opts = {}, &block) ⇒ Object
Interrupt the running poll loop, but do not clean up. This should be run anytime to let the poller re-evaluate state, etc.. This should only be accessed from a thread other than the poll thread,
and only if the poll thread is running
If a block is given, it will be executed in the poll thread just
prior to the execution of the user block passed to {#run}.
If the blocking: false option is is given, the call will not wait for
the interruption to be processed.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/0mq/poll_interruptible.rb', line 81 def interrupt(opts={}, &block) blocking = opts.fetch :blocking, true block ||= Proc.new { true } # block = block.dup key = block.object_id.to_s 36 @interruptions[key] = block # Store the block to be called if blocking @int_lock_req.synchronize { @int_sock_req.send_string key # Signal an interruption to #run @int_sock_req.recv_array # Wait until it has been handled by #run } return @interruptions.delete key # Return the stored result of the block else @int_lock_push.synchronize { @int_sock_push.send_string key # Signal an interruption to #run } return nil end end |
#kill ⇒ Object
Interrupt the running poll loop and permanently kill the Poll object This should be run once, when the Poll object is no longer needed. This should only be accessed from a thread other than the poll thread,
and only if the poll thread is running
Use #cleanup instead when there is no poll loop thread running.
108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/0mq/poll_interruptible.rb', line 108 def kill return nil if @dead @int_lock_req.synchronize do @int_sock_req.send_array ["KILL"] @int_sock_req.recv_array end @int_sock_req.close @int_sock_push.close true end |
#run(&block) ⇒ Object
Same as Poll#run, but will yield [nil, nil] to the block if interrupted. Return value may be an empty hash if the poller was killed.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/0mq/poll_interruptible.rb', line 38 def run(&block) raise "#{self} cannot run; it was permanently killed." if @dead super do |socket, revents| if socket == @int_sock_rep || socket == @int_sock_pull key, * = socket.recv_array kill = key == "KILL" blocking = socket == @int_sock_rep # Call the user block of #interrupt and store the return value unless kill result = @interruptions[key].call blocking ? @interruptions[key] = result : @interruptions.delete(key) end # Call the user block of #run block.call nil, nil if block # Send a response if the interruption was blocking socket.send_array ["OKAY"] if blocking if kill @int_sock_rep.close @int_sock_pull.close @dead = true end else block.call socket, revents if block end end.tap do |hash| hash.delete @int_sock_rep hash.delete @int_sock_pull end end |