Class: LogStash::Outputs::ZeroMQ

Inherits:
Base show all
Defined in:
lib/logstash/outputs/zeromq.rb

Overview

Write events to a 0MQ PUB socket.

You need to have the 0mq 2.1.x library installed to be able to use this output plugin.

The default settings will create a publisher connecting to a subscriber bound to tcp://127.0.0.1:2120

Constant Summary

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods inherited from Base

#handle, #handle_worker, #initialize, #worker_setup, #workers_not_supported

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s

Constructor Details

This class inherits a constructor from LogStash::Outputs::Base

Instance Method Details

#publish(payload) ⇒ Object

def receive



113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/logstash/outputs/zeromq.rb', line 113

def publish(payload)
  @logger.debug? && @logger.debug("0mq: sending", :event => payload)
  if @topology == "pubsub"
    # TODO(sissel): Need to figure out how to fit this into the codecs system.
    #@logger.debug("0mq output: setting topic to: #{event.sprintf(@topic)}")
    #error_check(@zsocket.send_string(event.sprintf(@topic), ZMQ::SNDMORE),
                #"in topic send_string")
  end
  error_check(@zsocket.send_string(payload), "in send_string")
rescue => e
  @logger.warn("0mq output exception", :address => @address, :exception => e)
end

#receive(event) ⇒ Object



107
108
109
110
111
# File 'lib/logstash/outputs/zeromq.rb', line 107

def receive(event)
  return unless output?(event)

  @codec.encode(event)
end

#registerObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/logstash/outputs/zeromq.rb', line 61

def register
  require "ffi-rzmq"
  require "logstash/util/zeromq"
  self.class.send(:include, LogStash::Util::ZeroMQ)

  if @mode == "server"
    workers_not_supported("With 'mode => server', only one zeromq socket may bind to a port and may not be shared among threads. Going to single-worker mode for this plugin!")
  end

  # Translate topology shorthand to socket types
  case @topology
  when "pair"
    zmq_const = ZMQ::PAIR
  when "pushpull"
    zmq_const = ZMQ::PUSH
  when "pubsub"
    zmq_const = ZMQ::PUB
  end # case socket_type

  @zsocket = context.socket(zmq_const)

  error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
              "while setting ZMQ::LINGER == 1)")

  if @sockopt
    setopts(@zsocket, @sockopt)
  end

  @address.each do |addr|
    setup(@zsocket, addr)
  end

  @codec.on_event(&method(:publish))
end

#teardownObject



97
98
99
# File 'lib/logstash/outputs/zeromq.rb', line 97

def teardown
  error_check(@zsocket.close, "while closing the socket")
end