Class: LogStash::Inputs::ZeroMQ
- Defined in:
- lib/logstash/inputs/zeromq.rb
Overview
Read events over a 0MQ SUB socket.
You need to have the 0mq 2.1.x library installed to be able to use this input plugin.
The default settings will create a subscriber binding to tcp://127.0.0.1:2120 waiting for connecting publishers.
Constant Summary
Constants included from Config::Mixin
Instance Attribute Summary
Attributes inherited from Base
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
- #register ⇒ Object
-
#run(output_queue) ⇒ Object
def server?.
-
#server? ⇒ Boolean
def teardown.
-
#teardown ⇒ Object
def register.
Methods inherited from Base
Methods included from Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s
Constructor Details
This class inherits a constructor from LogStash::Inputs::Base
Instance Method Details
#register ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/logstash/inputs/zeromq.rb', line 74 def register require "ffi-rzmq" require "logstash/util/zeromq" self.class.send(:include, LogStash::Util::ZeroMQ) case @topology when "pair" zmq_const = ZMQ::PAIR when "pushpull" zmq_const = ZMQ::PULL when "pubsub" zmq_const = ZMQ::SUB end # case socket_type @zsocket = context.socket(zmq_const) error_check(@zsocket.setsockopt(ZMQ::LINGER, 1), "while setting ZMQ::LINGER == 1)") if @sockopt setopts(@zsocket, @sockopt) end @address.each do |addr| setup(@zsocket, addr) end if @topology == "pubsub" if @topic.nil? @logger.debug("ZMQ - No topic provided. Subscribing to all messages") error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, ""), "while setting ZMQ::SUBSCRIBE") else @topic.each do |t| @logger.debug("ZMQ subscribing to topic: #{t}") error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, t), "while setting ZMQ::SUBSCRIBE == #{t}") end end end end |
#run(output_queue) ⇒ Object
def server?
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/logstash/inputs/zeromq.rb', line 123 def run(output_queue) host = Socket.gethostname begin loop do # Here's the unified receiver # Get the first part as the msg m1 = "" rc = @zsocket.recv_string(m1) error_check(rc, "in recv_string") @logger.debug("ZMQ receiving", :event => m1) msg = m1 # If we have more parts, we'll eat the first as the topic # and set the message to the second part if @zsocket.more_parts? @logger.debug("Multipart message detected. Setting @message to second part. First part was: #{m1}") m2 = '' rc2 = @zsocket.recv_string(m2) error_check(rc2, "in recv_string") @logger.debug("ZMQ receiving", :event => m2) msg = m2 end @codec.decode(msg) do |event| event["host"] ||= host decorate(event) output_queue << event end end rescue LogStash::ShutdownSignal # shutdown return rescue => e @logger.debug("ZMQ Error", :subscriber => @zsocket, :exception => e) retry end # begin end |
#server? ⇒ Boolean
def teardown
119 120 121 |
# File 'lib/logstash/inputs/zeromq.rb', line 119 def server? @mode == "server" end |
#teardown ⇒ Object
def register
115 116 117 |
# File 'lib/logstash/inputs/zeromq.rb', line 115 def teardown error_check(@zsocket.close, "while closing the zmq socket") end |