Class: LogStash::Inputs::Ganglia

Inherits:
Base show all
Defined in:
lib/logstash/inputs/ganglia.rb

Overview

Read ganglia packets from the network via udp

Constant Summary

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes inherited from Base

#params, #threadable

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods inherited from Base

#tag

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s

Constructor Details

#initialize(params) ⇒ Ganglia

Returns a new instance of Ganglia.



26
27
28
29
30
# File 'lib/logstash/inputs/ganglia.rb', line 26

def initialize(params)
  super
  @shutdown_requested = false
  BasicSocket.do_not_reverse_lookup = true
end

Instance Method Details

#parse_packet(packet) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/logstash/inputs/ganglia.rb', line 97

def parse_packet(packet)
  gmonpacket=GmonPacket.new(packet)
  if gmonpacket.meta?
    # Extract the metadata from the packet
    meta=gmonpacket.
    # Add it to the global metadata of this connection
    @metadata[meta['name']]=meta

    # We are ignoring meta events for putting things on the queue
    @logger.debug("received a meta packet", @metadata)
    return nil
  elsif gmonpacket.data?
    data=gmonpacket.parse_data(@metadata)

    # Check if it was a valid data request
    return nil unless data

    event=LogStash::Event.new

    data["program"] = "ganglia"
    event["log_host"] = data["hostname"]
    %w{dmax tmax slope type units}.each do |info|
      event[info] = @metadata[data["name"]][info]
    end
    return event
  else
    # Skipping unknown packet types
    return nil
  end
end

#registerObject



33
34
# File 'lib/logstash/inputs/ganglia.rb', line 33

def register
end

#run(output_queue) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/logstash/inputs/ganglia.rb', line 37

def run(output_queue)
  begin
    udp_listener(output_queue)
  rescue => e
    if !@shutdown_requested
      @logger.warn("ganglia udp listener died",
                   :address => "#{@host}:#{@port}", :exception => e,
      :backtrace => e.backtrace)
      sleep(5)
      retry
    end
  end # begin
end

#teardownObject



81
82
83
84
85
# File 'lib/logstash/inputs/ganglia.rb', line 81

def teardown
  @shutdown_requested = true
  close_udp
  finished
end