Class: LogStash::Outputs::ZeroMQ
- Defined in:
- lib/logstash/outputs/zeromq.rb
Overview
Write events to a 0MQ PUB socket.
You need to have the 0mq 2.1.x library installed to be able to use this output plugin.
The default settings will create a publisher connecting to a subscriber bound to tcp://127.0.0.1:2120
Constant Summary
Constants included from Config::Mixin
Instance Attribute Summary
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
-
#publish(payload) ⇒ Object
def receive.
- #receive(event) ⇒ Object
- #register ⇒ Object
- #teardown ⇒ Object
Methods inherited from Base
#handle, #handle_worker, #initialize, #worker_setup, #workers_not_supported
Methods included from Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s
Constructor Details
This class inherits a constructor from LogStash::Outputs::Base
Instance Method Details
#publish(payload) ⇒ Object
def receive
113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/logstash/outputs/zeromq.rb', line 113 def publish(payload) @logger.debug? && @logger.debug("0mq: sending", :event => payload) if @topology == "pubsub" # TODO(sissel): Need to figure out how to fit this into the codecs system. #@logger.debug("0mq output: setting topic to: #{event.sprintf(@topic)}") #error_check(@zsocket.send_string(event.sprintf(@topic), ZMQ::SNDMORE), #"in topic send_string") end error_check(@zsocket.send_string(payload), "in send_string") rescue => e @logger.warn("0mq output exception", :address => @address, :exception => e) end |
#receive(event) ⇒ Object
107 108 109 110 111 |
# File 'lib/logstash/outputs/zeromq.rb', line 107 def receive(event) return unless output?(event) @codec.encode(event) end |
#register ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/logstash/outputs/zeromq.rb', line 61 def register require "ffi-rzmq" require "logstash/util/zeromq" self.class.send(:include, LogStash::Util::ZeroMQ) if @mode == "server" workers_not_supported("With 'mode => server', only one zeromq socket may bind to a port and may not be shared among threads. Going to single-worker mode for this plugin!") end # Translate topology shorthand to socket types case @topology when "pair" zmq_const = ZMQ::PAIR when "pushpull" zmq_const = ZMQ::PUSH when "pubsub" zmq_const = ZMQ::PUB end # case socket_type @zsocket = context.socket(zmq_const) error_check(@zsocket.setsockopt(ZMQ::LINGER, 1), "while setting ZMQ::LINGER == 1)") if @sockopt setopts(@zsocket, @sockopt) end @address.each do |addr| setup(@zsocket, addr) end @codec.on_event(&method(:publish)) end |
#teardown ⇒ Object
97 98 99 |
# File 'lib/logstash/outputs/zeromq.rb', line 97 def teardown error_check(@zsocket.close, "while closing the socket") end |