Class: ZMQ::PollInterruptible

Inherits:
Poll
  • Object
show all
Defined in:
lib/0mq/poll_interruptible.rb

Overview

An interruptible version of Poll.

Instance Attribute Summary

Attributes inherited from Poll

#timeout

Instance Method Summary collapse

Methods inherited from Poll

poll, poll_nonblock, #run_nonblock

Constructor Details

#initialize(*sockets) ⇒ PollInterruptible

Creates the additional interruption objects and calls super Note that either #kill or #close MUST be called when done with the object. There is no automatic finalizer for this object.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/0mq/poll_interruptible.rb', line 10

def initialize(*sockets)
  @int_sock_rep  = ZMQ::Socket.new ZMQ::REP
  @int_sock_req  = ZMQ::Socket.new ZMQ::REQ
  @int_sock_push = ZMQ::Socket.new ZMQ::PUSH
  @int_sock_pull = ZMQ::Socket.new ZMQ::PULL
  
  # Choose an endpoint name that we can expect to be unique
  # so that they can be shared within the DefaultContext
  int_endpoint = "inproc://__PollInterruptible_int_"+hash.to_s(26)
  
  @int_sock_rep.bind     int_endpoint+'R'
  @int_sock_req.connect  int_endpoint+'R'
  @int_lock_req  = Mutex.new
  @int_sock_pull.bind    int_endpoint+'P'
  @int_sock_push.connect int_endpoint+'P'
  @int_lock_push = Mutex.new
  
  # Interruption blocks are stored here by key until #run receives them.
  # After each is run, the return value is stored here in its place.
  @interruptions = {}
  
  @dead = false
  
  super @int_sock_rep, @int_sock_pull, *sockets
end

Instance Method Details

#closeObject

Permanently kill the Poll object This should be run once, when the Poll object is no longer needed. This should only be accessed when there is no poll thread running. Use #kill instead when there is a poll loop thread running.



126
127
128
129
130
131
132
133
134
135
# File 'lib/0mq/poll_interruptible.rb', line 126

def close
  return nil if @dead
  
  @int_sock_rep.close
  @int_sock_req.close
  @int_sock_pull.close
  @int_sock_push.close
  
  @dead = true
end

#dead?Boolean

Return true if the object has been killed or closed and cannot be run

Returns:

  • (Boolean)


138
139
140
# File 'lib/0mq/poll_interruptible.rb', line 138

def dead?
  @dead
end

#interrupt(opts = {}, &block) ⇒ Object

Interrupt the running poll loop, but do not clean up. This should be run anytime to let the poller re-evaluate state, etc.. This should only be accessed from a thread other than the poll thread,

and only if the poll thread is running

If a block is given, it will be executed in the poll thread just

prior to the execution of the user block passed to {#run}.

If the blocking: false option is is given, the call will not wait for

the interruption to be processed.


81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/0mq/poll_interruptible.rb', line 81

def interrupt(opts={}, &block)
  blocking = opts.fetch :blocking, true
  block ||= Proc.new { true }
  # block = block.dup
  key = block.object_id.to_s 36
  
  @interruptions[key] = block # Store the block to be called
  
  if blocking
    @int_lock_req.synchronize {
      @int_sock_req.send_string key # Signal an interruption to #run
      @int_sock_req.recv_array      # Wait until it has been handled by #run
    }
    return @interruptions.delete key # Return the stored result of the block
  else
    @int_lock_push.synchronize {
      @int_sock_push.send_string key # Signal an interruption to #run
    }
    return nil
  end
end

#killObject

Interrupt the running poll loop and permanently kill the Poll object This should be run once, when the Poll object is no longer needed. This should only be accessed from a thread other than the poll thread,

and only if the poll thread is running

Use #cleanup instead when there is no poll loop thread running.



108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/0mq/poll_interruptible.rb', line 108

def kill
  return nil if @dead
  
  @int_lock_req.synchronize do
    @int_sock_req.send_array ["KILL"]
    @int_sock_req.recv_array
  end
  
  @int_sock_req.close
  @int_sock_push.close
  
  true
end

#run(&block) ⇒ Object

Same as Poll#run, but will yield [nil, nil] to the block if interrupted. Return value may be an empty hash if the poller was killed.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/0mq/poll_interruptible.rb', line 38

def run(&block)
  raise "#{self} cannot run; it was permanently killed." if @dead
  
  super do |socket, revents|
    if socket == @int_sock_rep || socket == @int_sock_pull
      key, * = socket.recv_array
      kill = key == "KILL"
      blocking = socket == @int_sock_rep
      
      # Call the user block of #interrupt and store the return value
      unless kill
        result = @interruptions[key].call
        blocking ? @interruptions[key] = result : @interruptions.delete(key)
      end
      
      # Call the user block of #run
      block.call nil, nil if block
      
      # Send a response if the interruption was blocking
      socket.send_array ["OKAY"] if blocking
      
      if kill
        @int_sock_rep.close
        @int_sock_pull.close
        @dead = true
      end
    else
      block.call socket, revents if block
    end
  end.tap do |hash|
    hash.delete @int_sock_rep
    hash.delete @int_sock_pull
  end
end