Class: Nodule::ZeroMQ
Overview
A resource for setting up and testing ZeroMQ message flows. The most basic usage will provide auto-generated IPC URI’s, which can be handy for testing. More advanced usage uses the built-in tap device to sniff messages while they’re in-flight.
Instance Attribute Summary collapse
-
#ctx ⇒ Object
readonly
Returns the value of attribute ctx.
-
#error_count ⇒ Object
readonly
Returns the value of attribute error_count.
-
#limit ⇒ Object
readonly
Returns the value of attribute limit.
-
#method ⇒ Object
readonly
Returns the value of attribute method.
-
#type ⇒ Object
readonly
Returns the value of attribute type.
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Attributes inherited from Tempfile
Attributes inherited from Base
#prefix, #read_count, #readers, #running, #topology
Instance Method Summary collapse
- #done? ⇒ Boolean
-
#initialize(opts) ⇒ ZeroMQ
constructor
:connect and :bind are allowed at the same time and must be of the same socket type.
- #run ⇒ Object
- #socket ⇒ Object
-
#stop ⇒ Object
send a message to the child thread telling it to exit and join the thread.
-
#stop! ⇒ Object
If the thread is still alive, force an exception in the thread and continue to do the things stop does.
-
#subscribe(subscription) ⇒ Object
For PUB sockets only, subscribe to a prefix.
-
#to_s ⇒ Object
Return the URI generated/provided for this resource.
-
#wait(timeout = 60) ⇒ Object
Wait for the ZMQ thread to exit on its own, mostly useful with :limit => Fixnum.
Methods inherited from Tempfile
Methods inherited from Base
#add_reader, #add_readers, #clear!, #join_topology!, #output, #output!, #output?, #read_until, #require_read_count, #run_readers, #verbose, #wait_with_backoff
Constructor Details
#initialize(opts) ⇒ ZeroMQ
:connect and :bind are allowed at the same time and must be of the same socket type. ZMQ::SUB sockets that are connected/bound will subscribe to “” by default.
For the rest of the options, see Hastur::Test::Resource::Base.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/nodule/zeromq.rb', line 43 def initialize(opts) opts[:suffix] ||= '.zmq' super(opts) @ctx = ::ZMQ::Context.new @zmq_thread = nil @error_count = 0 @sockprocs = [] @limit = nil @timeout_started = false @stopped = false # Sockets cannot be used across thread boundaries, so use a ZMQ::PAIR socket both to synchronize thread # startup and pass writes form main -> thread. The .socket method will return the PAIR socket. @pipe_uri = "inproc://pair-#{Nodule.next_seq}" @pipe = @ctx.socket(::ZMQ::PAIR) @child = @ctx.socket(::ZMQ::PAIR) setsockopt(@pipe, :hwm, 1) setsockopt(@child, :hwm, 1) setsockopt(@pipe, ::ZMQ::LINGER, 1.0) setsockopt(@child, ::ZMQ::LINGER, 1.0) @pipe.bind(@pipe_uri) @child.connect(@pipe_uri) case opts[:uri] # Socket files are specified so they land in PWD, in the future we might want to specify a temp # dir, but that has a whole different bag of issues, so stick with simple until it's needed. when :gen, :generate @uri = "ipc://#{@file.to_s}" when String @uri = opts[:uri] else raise ArgumentError.new "Invalid URI specifier: (#{opts[:uri].class}) '#{opts[:uri]}'" end if opts[:connect] and opts[:bind] and opts[:connect] != opts[:bind] raise ArgumentError.new "ZMQ socket types must be the same when enabling :bind and :connect" end # only set type and create a socket if :bind or :connect is specified # otherwise, the caller probably just wants to generate a URI, or possibly # use a pre-created socket? (not supported yet) if @type = (opts[:connect] || opts[:bind]) @socket = @ctx.socket(@type) setsockopt(@socket, :hwm, 1) setsockopt(@socket, ::ZMQ::LINGER, 1.0) if opts[:connect] @sockprocs << proc { @socket.connect(@uri) } # deferred end if opts[:bind] @sockprocs << proc { @socket.bind(@uri) } # deferred end # by default, subscribe to "" on ZMQ::SUB sockets, since that's what we want # the vast majority of the time. Also allow specifying a subscription for those # times we want something else. if @type == ::ZMQ::SUB if opts[:subscribe] @sockprocs << proc { subscribe(opts[:subscribe]) } else @sockprocs << proc { subscribe("") } end end end if opts[:limit] @limit = opts[:limit] end end |
Instance Attribute Details
#ctx ⇒ Object (readonly)
Returns the value of attribute ctx.
12 13 14 |
# File 'lib/nodule/zeromq.rb', line 12 def ctx @ctx end |
#error_count ⇒ Object (readonly)
Returns the value of attribute error_count.
12 13 14 |
# File 'lib/nodule/zeromq.rb', line 12 def error_count @error_count end |
#limit ⇒ Object (readonly)
Returns the value of attribute limit.
12 13 14 |
# File 'lib/nodule/zeromq.rb', line 12 def limit @limit end |
#method ⇒ Object (readonly)
Returns the value of attribute method.
12 13 14 |
# File 'lib/nodule/zeromq.rb', line 12 def method @method end |
#type ⇒ Object (readonly)
Returns the value of attribute type.
12 13 14 |
# File 'lib/nodule/zeromq.rb', line 12 def type @type end |
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
12 13 14 |
# File 'lib/nodule/zeromq.rb', line 12 def uri @uri end |
Instance Method Details
#done? ⇒ Boolean
151 152 153 |
# File 'lib/nodule/zeromq.rb', line 151 def done? @stopped end |
#run ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/nodule/zeromq.rb', line 116 def run super return if @sockprocs.empty? # wrap the block in a block so errors don't simply vanish until join time @zmq_thread = Thread.new do Thread.current.abort_on_exception # sockets have to be created inside the thread that uses them @sockprocs.each { |p| p.call } _zmq_read() verbose "child thread #{Thread.current} shutting down" @child.close @socket.close if @socket end Thread.pass @stopped = @zmq_thread.alive? ? false : true end |
#socket ⇒ Object
139 140 141 |
# File 'lib/nodule/zeromq.rb', line 139 def socket @pipe end |
#stop ⇒ Object
send a message to the child thread telling it to exit and join the thread
197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/nodule/zeromq.rb', line 197 def stop return if @stopped @pipe.send_strings(["exit"], 1) Thread.pass super @zmq_thread.join if @zmq_thread @pipe.close if @pipe @stopped = true end |
#stop! ⇒ Object
If the thread is still alive, force an exception in the thread and continue to do the things stop does.
179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/nodule/zeromq.rb', line 179 def stop! if @zmq_thread.alive? STDERR.puts "force stop! called, issuing Thread.raise" @zmq_thread.raise "force stop! called" end stop wait 1 @zmq_thread.join if @zmq_thread @pipe.close if @pipe @stopped = true end |
#subscribe(subscription) ⇒ Object
For PUB sockets only, subscribe to a prefix.
147 148 149 |
# File 'lib/nodule/zeromq.rb', line 147 def subscribe(subscription) @pipe.send_strings ["subscribe", subscription] end |
#to_s ⇒ Object
Return the URI generated/provided for this resource. For tapped devices, the “front” side of the tap is returned.
216 217 218 |
# File 'lib/nodule/zeromq.rb', line 216 def to_s @uri end |
#wait(timeout = 60) ⇒ Object
Wait for the ZMQ thread to exit on its own, mostly useful with :limit => Fixnum.
This does not signal the child thread.
160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/nodule/zeromq.rb', line 160 def wait(timeout=60) countdown = timeout.to_f while countdown > 0 if @zmq_thread and @zmq_thread.alive? sleep 0.1 countdown = countdown - 0.1 else break end end super() end |