Class: LogStash::Inputs::Jmx

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/jmx.rb

Overview

This input plugin permits to retrieve metrics from remote Java applications using JMX. Every ‘polling_frequency`, it scans a folder containing json configuration files describing JVMs to monitor with metrics to retrieve. Then a pool of threads will retrieve metrics and create events.

## The configuration:

In Logstash configuration, you must set the polling frequency, the number of thread used to poll metrics and a directory absolute path containing json files with the configuration per jvm of metrics to retrieve. Logstash input configuration example:

source,ruby

jmx

//Required
path => "/apps/logstash_conf/jmxconf"
//Optional, default 60s
polling_frequency => 15
type => "jmx"
//Optional, default 4
nb_thread => 4

Json JMX configuration example:

source,js

{

//Required, JMX listening host/ip
"host" : "192.168.1.2",
//Required, JMX listening port
"port" : 1335,
//Optional, the username to connect to JMX
"username" : "user",
//Optional, the password to connect to JMX
"password": "pass",
//Optional, use this alias as a prefix in the metric name. If not set use <host>_<port>
"alias" : "test.homeserver.elasticsearch",
//Required, list of JMX metrics to retrieve
"queries" : [
{
  //Required, the object name of Mbean to request
  "object_name" : "java.lang:type=Memory",
  //Optional, use this alias in the metrics value instead of the object_name
  "object_alias" : "Memory"
}, {
  "object_name" : "java.lang:type=Runtime",
  //Optional, set of attributes to retrieve. If not set retrieve
  //all metrics available on the configured object_name.
  "attributes" : [ "Uptime", "StartTime" ],
  "object_alias" : "Runtime"
}, {
  //object_name can be configured with * to retrieve all matching Mbeans
  "object_name" : "java.lang:type=GarbageCollector,name=*",
  "attributes" : [ "CollectionCount", "CollectionTime" ],
  //object_alias can be based on specific value from the object_name thanks to ${<varname>}.
  //In this case ${type} will be replaced by GarbageCollector...
  "object_alias" : "${type}.${name}"
}, {
  "object_name" : "java.nio:type=BufferPool,name=*",
  "object_alias" : "${type}.${name}"
} ]

}

Here are examples of generated events. When returned metrics value type is number/boolean it is stored in ‘metric_value_number` event field otherwise it is stored in `metric_value_string` event field.

source,ruby

"@version" => "1",
"@timestamp" => "2014-02-18T20:57:27.688Z",
"host" => "192.168.1.2",
"path" => "/apps/logstash_conf/jmxconf",
"type" => "jmx",
"metric_path" => "test.homeserver.elasticsearch.GarbageCollector.ParNew.CollectionCount",
"metric_value_number" => 2212

source,ruby

"@version" => "1",
"@timestamp" => "2014-02-18T20:58:06.376Z",
"host" => "localhost",
"path" => "/apps/logstash_conf/jmxconf",
"type" => "jmx",
"metric_path" => "test.homeserver.elasticsearch.BufferPool.mapped.ObjectName",
"metric_value_string" => "java.nio:type=BufferPool,name=mapped"

Constant Summary collapse

MISSING_CONFIG_PARAMETER =

Error messages

"Missing parameter '%s'."
BAD_TYPE_CONFIG_PARAMETER =
"Bad type for parameter '%{param}', expecting '%{expected}', found '%{actual}'."
MISSING_QUERY_PARAMETER =
"Missing parameter '%s' in queries[%d]."
BAD_TYPE_QUERY =
"Bad type for queries[%{index}], expecting '%{expected}', found '%{actual}'."
BAD_TYPE_QUERY_PARAMETER =
"Bad type for parameter '%{param}' in queries[%{index}], expecting '%{expected}', found '%{actual}'."

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#queue_confObject

Returns the value of attribute queue_conf.



97
98
99
# File 'lib/logstash/inputs/jmx.rb', line 97

def queue_conf
  @queue_conf
end

#regexp_group_alias_objectObject

Class Var



96
97
98
# File 'lib/logstash/inputs/jmx.rb', line 96

def regexp_group_alias_object
  @regexp_group_alias_object
end

Instance Method Details

#registerObject



313
314
315
316
317
318
319
320
321
322
# File 'lib/logstash/inputs/jmx.rb', line 313

def register
  require 'thread'
  require 'jmx4r'

  @logger.info("Create queue dispatching JMX requests to threads")
  @queue_conf = Queue.new

  @logger.info("Compile regexp for group alias object replacement")
  @regexp_group_alias_object = Regexp.new('(?:\${(.*?)})+')
end

#run(queue) ⇒ Object



325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
# File 'lib/logstash/inputs/jmx.rb', line 325

def run(queue)
  begin
    threads = []
    @logger.info("Initialize #{@nb_thread} threads for JMX metrics collection")
    @nb_thread.times do
      threads << Thread.new { thread_jmx(@queue_conf,queue) }
    end

    while !stop?
      @logger.info("Loading configuration files in path", :path => @path)
      Dir.foreach(@path) do |item|
        next if item == '.' or item == '..'
        begin
          file_conf = File.join(@path, item)
          @logger.debug? && @logger.debug("Loading configuration from file", :file => file_conf)
          config_string = File.read(file_conf)
          conf_hash = LogStash::Json.load(config_string)
          validation_errors = validate_configuration(conf_hash)
          if validation_errors.empty?
            @logger.debug? && @logger.debug("Add configuration to the queue", :config => conf_hash)
            @queue_conf << conf_hash
          else
            @logger.warn("Issue with configuration file", :file => file_conf,
            :validation_errors => validation_errors)
          end
        rescue Exception => ex
          @logger.warn("Issue loading configuration from file", :file => file_conf,
            :exception => ex.message, :backtrace => ex.backtrace)
        end
      end

      @logger.debug('Wait until the queue conf is empty')
      delta=0
      until @queue_conf.empty?
        @logger.debug("There are still #{@queue_conf.size} messages in the queue conf. Sleep 1s.")
        delta=delta+1
        sleep(1)
      end

      wait_time=@polling_frequency-delta
      if wait_time>0
        @logger.debug("Wait #{wait_time}s (#{@polling_frequency}-#{delta}(seconds wait until queue conf empty)) before to launch again a new jmx metrics collection")
        Stud.stoppable_sleep(wait_time) { stop? }
      else
        @logger.warn("The time taken to retrieve metrics is more important than the retrieve_interval time set.
                     \nYou must adapt nb_thread, retrieve_interval to the number of jvm/metrics you want to retrieve.")
      end
    end
  rescue Exception => ex
    @logger.error(ex.message)
    @logger.error(ex.backtrace.join("\n"))
  ensure
    @nb_thread.times do |i|
      @logger.debug? && @logger.debug("Signaling termination to jmx thread #{i + 1}")
      @queue_conf << :END
    end
    threads.each {|t| t.join }
  end
end

#validate_configuration(conf_hash) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/logstash/inputs/jmx.rb', line 117

def validate_configuration(conf_hash)
  validation_errors = []
  #Check required parameters in configuration
  ["host", "port","queries"].each do |param|
    validation_errors << MISSING_CONFIG_PARAMETER % param unless conf_hash.has_key?(param)
  end

  #Validate parameters type in configuration
  {"host" => String, "port" => Fixnum, "alias" => String }.each do |param, expected_type|
    if conf_hash.has_key?(param) && !conf_hash[param].instance_of?(expected_type)
      validation_errors << BAD_TYPE_CONFIG_PARAMETER % { :param => param, :expected => expected_type, :actual => conf_hash[param].class }
    end
  end

  if conf_hash.has_key?("queries")
    if !conf_hash["queries"].respond_to?(:each)
      validation_errors << BAD_TYPE_CONFIG_PARAMETER % { :param => 'queries', :expected => Enumerable, :actual => conf_hash['queries'].class }
    else
      conf_hash['queries'].each_with_index do |query,index|
        unless query.respond_to?(:[]) && query.respond_to?(:has_key?)
          validation_errors << BAD_TYPE_QUERY % {:index => index, :expected => Hash, :actual => query.class}
          next
        end
        #Check required parameters in each query
        ["object_name"].each do |param|
          validation_errors << MISSING_QUERY_PARAMETER % [param,index] unless query.has_key?(param)
        end
        #Validate parameters type in each query
        {"object_name" => String, "object_alias" => String }.each do |param, expected_type|
          if query.has_key?(param) && !query[param].instance_of?(expected_type)
            validation_errors << BAD_TYPE_QUERY_PARAMETER % { :param => param, :index => index, :expected => expected_type, :actual => query[param].class }
          end
        end

        if query.has_key?("attributes") && !query["attributes"].respond_to?(:each)
          validation_errors << BAD_TYPE_QUERY_PARAMETER % { :param => 'attributes', :index => index, :expected => Enumerable, :actual => query['attributes'].class }
        end
      end
    end
  end
  return validation_errors
end