Class: ZMQMachine::Device::Queue::XReqHandler
- Inherits:
-
Object
- Object
- ZMQMachine::Device::Queue::XReqHandler
- Defined in:
- lib/zm/device/queue.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#socket_out ⇒ Object
Returns the value of attribute socket_out.
Instance Method Summary collapse
- #error_check(rc) ⇒ Object
-
#initialize(config, address, direction) ⇒ XReqHandler
constructor
A new instance of XReqHandler.
- #on_attach(socket) ⇒ Object
- #on_readable(socket, messages, envelope) ⇒ Object
- #on_writable(socket) ⇒ Object
- #set_options(socket) ⇒ Object
Constructor Details
#initialize(config, address, direction) ⇒ XReqHandler
Returns a new instance of XReqHandler.
76 77 78 79 80 81 82 |
# File 'lib/zm/device/queue.rb', line 76 def initialize(config, address, direction) @reactor = config.reactor @address = address @verbose = config.verbose @config = config @direction = direction end |
Instance Attribute Details
#socket_out ⇒ Object
Returns the value of attribute socket_out.
74 75 76 |
# File 'lib/zm/device/queue.rb', line 74 def socket_out @socket_out end |
Instance Method Details
#error_check(rc) ⇒ Object
122 123 124 125 126 127 128 129 130 |
# File 'lib/zm/device/queue.rb', line 122 def error_check(rc) if ZMQ::Util.resultcode_ok?(rc) false else @reactor.log(:error, "Operation failed, errno [#{ZMQ::Util.errno}] description [#{ZMQ::Util.error_string}]") caller(1).each { |callstack| @reactor.log(:callstack, callstack) } true end end |
#on_attach(socket) ⇒ Object
84 85 86 87 88 |
# File 'lib/zm/device/queue.rb', line 84 def on_attach(socket) (socket) rc = socket.bind(@address) error_check(rc) end |
#on_readable(socket, messages, envelope) ⇒ Object
94 95 96 97 98 99 100 101 102 103 |
# File 'lib/zm/device/queue.rb', line 94 def on_readable(socket, , envelope) all = (envelope + ) all.each { |msg| @reactor.log(:device, "[Q#{@direction}] [#{msg.copy_out_string}]") } if @verbose if @socket_out # FIXME: need to be able to handle EAGAIN/failed send rc = socket_out.(all) all.each { || .close } end end |
#on_writable(socket) ⇒ Object
90 91 92 |
# File 'lib/zm/device/queue.rb', line 90 def on_writable(socket) @reactor.deregister_writable(socket) end |
#set_options(socket) ⇒ Object
107 108 109 110 |
# File 'lib/zm/device/queue.rb', line 107 def (socket) error_check(socket.raw_socket.setsockopt(ZMQ::HWM, @config.hwm)) error_check(socket.raw_socket.setsockopt(ZMQ::LINGER, @config.linger)) end |