Module: LogStash::Util::ZeroMQ
- Defined in:
- lib/logstash/util/zeromq.rb
Constant Summary collapse
- CONTEXT =
ZMQ::Context.new
- STRING_OPTS =
%w{IDENTITY SUBSCRIBE UNSUBSCRIBE}
Instance Method Summary collapse
- #context ⇒ Object
- #error_check(rc, doing) ⇒ Object
-
#setopts(socket, options) ⇒ Object
def error_check.
- #setup(socket, address) ⇒ Object
Instance Method Details
#context ⇒ Object
11 12 13 |
# File 'lib/logstash/util/zeromq.rb', line 11 def context CONTEXT end |
#error_check(rc, doing) ⇒ Object
24 25 26 27 28 29 |
# File 'lib/logstash/util/zeromq.rb', line 24 def error_check(rc, doing) unless ZMQ::Util.resultcode_ok?(rc) @logger.error("ZeroMQ error while #{doing}", { :error_code => rc }) raise "ZeroMQ Error while #{doing}" end end |
#setopts(socket, options) ⇒ Object
def error_check
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/logstash/util/zeromq.rb', line 31 def setopts(socket, ) .each do |opt,value| sockopt = opt.split('::')[1] option = ZMQ.const_defined?(sockopt) ? ZMQ.const_get(sockopt) : ZMQ.const_missing(sockopt) unless STRING_OPTS.include?(sockopt) begin Float(value) value = value.to_i rescue ArgumentError raise "#{sockopt} requires a numeric value. #{value} is not numeric" end end # end unless error_check(socket.setsockopt(option, value), "while setting #{opt} == #{value}") end # end each end |
#setup(socket, address) ⇒ Object
15 16 17 18 19 20 21 22 |
# File 'lib/logstash/util/zeromq.rb', line 15 def setup(socket, address) if server? error_check(socket.bind(address), "binding to #{address}") else error_check(socket.connect(address), "connecting to #{address}") end @logger.info("0mq: #{server? ? 'connected' : 'bound'}", :address => address) end |