Class: LogStash::Inputs::ZeroMQ

Inherits:
Base show all
Defined in:
lib/logstash/inputs/zeromq.rb

Overview

Read events over a 0MQ SUB socket.

You need to have the 0mq 2.1.x library installed to be able to use this input plugin.

The default settings will create a subscriber binding to tcp://127.0.0.1:2120 waiting for connecting publishers.

Constant Summary

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes inherited from Base

#params, #threadable

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods inherited from Base

#initialize, #tag

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s

Constructor Details

This class inherits a constructor from LogStash::Inputs::Base

Instance Method Details

#registerObject



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/logstash/inputs/zeromq.rb', line 74

def register
  require "ffi-rzmq"
  require "logstash/util/zeromq"
  self.class.send(:include, LogStash::Util::ZeroMQ)

  case @topology
  when "pair"
    zmq_const = ZMQ::PAIR 
  when "pushpull"
    zmq_const = ZMQ::PULL
  when "pubsub"
    zmq_const = ZMQ::SUB
  end # case socket_type
  @zsocket = context.socket(zmq_const)
  error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
              "while setting ZMQ::LINGER == 1)")

  if @sockopt
    setopts(@zsocket, @sockopt)
  end

  @address.each do |addr|
    setup(@zsocket, addr)
  end

  if @topology == "pubsub"
    if @topic.nil?
      @logger.debug("ZMQ - No topic provided. Subscribing to all messages")
      error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, ""),
    "while setting ZMQ::SUBSCRIBE")
    else
      @topic.each do |t|
        @logger.debug("ZMQ subscribing to topic: #{t}")
        error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, t),
      "while setting ZMQ::SUBSCRIBE == #{t}")
      end
    end
  end

end

#run(output_queue) ⇒ Object

def server?



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/logstash/inputs/zeromq.rb', line 123

def run(output_queue)
  host = Socket.gethostname
  begin
    loop do
      # Here's the unified receiver
      # Get the first part as the msg
      m1 = ""
      rc = @zsocket.recv_string(m1)
      error_check(rc, "in recv_string")
      @logger.debug("ZMQ receiving", :event => m1)
      msg = m1
      # If we have more parts, we'll eat the first as the topic
      # and set the message to the second part
      if @zsocket.more_parts?
        @logger.debug("Multipart message detected. Setting @message to second part. First part was: #{m1}")
        m2 = ''
        rc2 = @zsocket.recv_string(m2)
        error_check(rc2, "in recv_string")
        @logger.debug("ZMQ receiving", :event => m2)
        msg = m2
      end

      @codec.decode(msg) do |event|
        event["host"] ||= host
        decorate(event)
        output_queue << event
      end
    end
  rescue LogStash::ShutdownSignal
    # shutdown
    return
  rescue => e
    @logger.debug("ZMQ Error", :subscriber => @zsocket,
                  :exception => e)
    retry
  end # begin
end

#server?Boolean

def teardown

Returns:

  • (Boolean)


119
120
121
# File 'lib/logstash/inputs/zeromq.rb', line 119

def server?
  @mode == "server"
end

#teardownObject

def register



115
116
117
# File 'lib/logstash/inputs/zeromq.rb', line 115

def teardown
  error_check(@zsocket.close, "while closing the zmq socket")
end