Class: ZMQMachine::Device::Queue::XReqHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/zm/device/queue.rb

Direct Known Subclasses

XRepHandler

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, address, direction) ⇒ XReqHandler

Returns a new instance of XReqHandler.



76
77
78
79
80
81
82
# File 'lib/zm/device/queue.rb', line 76

def initialize(config, address, direction)
  @reactor = config.reactor
  @address = address
  @verbose = config.verbose
  @config = config
  @direction = direction
end

Instance Attribute Details

#socket_outObject

Returns the value of attribute socket_out.



74
75
76
# File 'lib/zm/device/queue.rb', line 74

def socket_out
  @socket_out
end

Instance Method Details

#error_check(rc) ⇒ Object



122
123
124
125
126
127
128
129
130
# File 'lib/zm/device/queue.rb', line 122

def error_check(rc)
  if ZMQ::Util.resultcode_ok?(rc)
    false
  else
    @reactor.log(:error, "Operation failed, errno [#{ZMQ::Util.errno}] description [#{ZMQ::Util.error_string}]")
    caller(1).each { |callstack| @reactor.log(:callstack, callstack) }
    true
  end
end

#on_attach(socket) ⇒ Object



84
85
86
87
88
# File 'lib/zm/device/queue.rb', line 84

def on_attach(socket)
  set_options(socket)
  rc = socket.bind(@address)
  error_check(rc)
end

#on_readable(socket, messages, envelope) ⇒ Object



94
95
96
97
98
99
100
101
102
103
# File 'lib/zm/device/queue.rb', line 94

def on_readable(socket, messages, envelope)
  all = (envelope + messages)
  all.each { |msg| @reactor.log(:device, "[Q#{@direction}] [#{msg.copy_out_string}]") } if @verbose

  if @socket_out
    # FIXME: need to be able to handle EAGAIN/failed send
    rc = socket_out.send_messages(all)
    all.each { |message| message.close }
  end
end

#on_writable(socket) ⇒ Object



90
91
92
# File 'lib/zm/device/queue.rb', line 90

def on_writable(socket)
  @reactor.deregister_writable(socket)
end

#set_options(socket) ⇒ Object



107
108
109
110
# File 'lib/zm/device/queue.rb', line 107

def set_options(socket)
  error_check(socket.raw_socket.setsockopt(ZMQ::HWM, @config.hwm))
  error_check(socket.raw_socket.setsockopt(ZMQ::LINGER, @config.linger))
end