Class: LogStash::Inputs::Jmx
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::Jmx
- Defined in:
- lib/logstash/inputs/jmx.rb
Overview
This input plugin permits to retrieve metrics from remote Java applications using JMX. Every ‘polling_frequency`, it scans a folder containing json configuration files describing JVMs to monitor with metrics to retrieve. Then a pool of threads will retrieve metrics and create events.
## The configuration:
In Logstash configuration, you must set the polling frequency, the number of thread used to poll metrics and a directory absolute path containing json files with the configuration per jvm of metrics to retrieve. Logstash input configuration example:
- source,ruby
-
jmx
//Required path => "/apps/logstash_conf/jmxconf" //Optional, default 60s polling_frequency => 15 type => "jmx" //Optional, default 4 nb_thread => 4
Json JMX configuration example:
- source,js
-
{
//Required, JMX listening host/ip "host" : "192.168.1.2", //Required, JMX listening port "port" : 1335, //Optional, the username to connect to JMX "username" : "user", //Optional, the password to connect to JMX "password": "pass", //Optional, use this alias as a prefix in the metric name. If not set use <host>_<port> "alias" : "test.homeserver.elasticsearch", //Required, list of JMX metrics to retrieve "queries" : [ { //Required, the object name of Mbean to request "object_name" : "java.lang:type=Memory", //Optional, use this alias in the metrics value instead of the object_name "object_alias" : "Memory" }, { "object_name" : "java.lang:type=Runtime", //Optional, set of attributes to retrieve. If not set retrieve //all metrics available on the configured object_name. "attributes" : [ "Uptime", "StartTime" ], "object_alias" : "Runtime" }, { //object_name can be configured with * to retrieve all matching Mbeans "object_name" : "java.lang:type=GarbageCollector,name=*", "attributes" : [ "CollectionCount", "CollectionTime" ], //object_alias can be based on specific value from the object_name thanks to ${<varname>}. //In this case ${type} will be replaced by GarbageCollector... "object_alias" : "${type}.${name}" }, { "object_name" : "java.nio:type=BufferPool,name=*", "object_alias" : "${type}.${name}" } ]
}
Here are examples of generated events. When returned metrics value type is number/boolean it is stored in ‘metric_value_number` event field otherwise it is stored in `metric_value_string` event field.
- source,ruby
-
"@version" => "1", "@timestamp" => "2014-02-18T20:57:27.688Z", "host" => "192.168.1.2", "path" => "/apps/logstash_conf/jmxconf", "type" => "jmx", "metric_path" => "test.homeserver.elasticsearch.GarbageCollector.ParNew.CollectionCount", "metric_value_number" => 2212
- source,ruby
-
"@version" => "1", "@timestamp" => "2014-02-18T20:58:06.376Z", "host" => "localhost", "path" => "/apps/logstash_conf/jmxconf", "type" => "jmx", "metric_path" => "test.homeserver.elasticsearch.BufferPool.mapped.ObjectName", "metric_value_string" => "java.nio:type=BufferPool,name=mapped"
Constant Summary collapse
- MISSING_CONFIG_PARAMETER =
Error messages
"Missing parameter '%s'."
- BAD_TYPE_CONFIG_PARAMETER =
"Bad type for parameter '%{param}', expecting '%{expected}', found '%{actual}'."
- MISSING_QUERY_PARAMETER =
"Missing parameter '%s' in queries[%d]."
- BAD_TYPE_QUERY =
"Bad type for queries[%{index}], expecting '%{expected}', found '%{actual}'."
- BAD_TYPE_QUERY_PARAMETER =
"Bad type for parameter '%{param}' in queries[%{index}], expecting '%{expected}', found '%{actual}'."
Instance Attribute Summary collapse
-
#queue_conf ⇒ Object
Returns the value of attribute queue_conf.
-
#regexp_group_alias_object ⇒ Object
Class Var.
Instance Method Summary collapse
Instance Attribute Details
#queue_conf ⇒ Object
Returns the value of attribute queue_conf.
97 98 99 |
# File 'lib/logstash/inputs/jmx.rb', line 97 def queue_conf @queue_conf end |
#regexp_group_alias_object ⇒ Object
Class Var
96 97 98 |
# File 'lib/logstash/inputs/jmx.rb', line 96 def regexp_group_alias_object @regexp_group_alias_object end |
Instance Method Details
#register ⇒ Object
313 314 315 316 317 318 319 320 321 322 |
# File 'lib/logstash/inputs/jmx.rb', line 313 def register require 'thread' require 'jmx4r' @logger.info("Create queue dispatching JMX requests to threads") @queue_conf = Queue.new @logger.info("Compile regexp for group alias object replacement") @regexp_group_alias_object = Regexp.new('(?:\${(.*?)})+') end |
#run(queue) ⇒ Object
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 |
# File 'lib/logstash/inputs/jmx.rb', line 325 def run(queue) begin threads = [] @logger.info("Initialize #{@nb_thread} threads for JMX metrics collection") @nb_thread.times do threads << Thread.new { thread_jmx(@queue_conf,queue) } end while !stop? @logger.info("Loading configuration files in path", :path => @path) Dir.foreach(@path) do |item| next if item == '.' or item == '..' begin file_conf = File.join(@path, item) @logger.debug? && @logger.debug("Loading configuration from file", :file => file_conf) config_string = File.read(file_conf) conf_hash = LogStash::Json.load(config_string) validation_errors = validate_configuration(conf_hash) if validation_errors.empty? @logger.debug? && @logger.debug("Add configuration to the queue", :config => conf_hash) @queue_conf << conf_hash else @logger.warn("Issue with configuration file", :file => file_conf, :validation_errors => validation_errors) end rescue Exception => ex @logger.warn("Issue loading configuration from file", :file => file_conf, :exception => ex., :backtrace => ex.backtrace) end end @logger.debug('Wait until the queue conf is empty') delta=0 until @queue_conf.empty? @logger.debug("There are still #{@queue_conf.size} messages in the queue conf. Sleep 1s.") delta=delta+1 sleep(1) end wait_time=@polling_frequency-delta if wait_time>0 @logger.debug("Wait #{wait_time}s (#{@polling_frequency}-#{delta}(seconds wait until queue conf empty)) before to launch again a new jmx metrics collection") Stud.stoppable_sleep(wait_time) { stop? } else @logger.warn("The time taken to retrieve metrics is more important than the retrieve_interval time set. \nYou must adapt nb_thread, retrieve_interval to the number of jvm/metrics you want to retrieve.") end end rescue Exception => ex @logger.error(ex.) @logger.error(ex.backtrace.join("\n")) ensure @nb_thread.times do |i| @logger.debug? && @logger.debug("Signaling termination to jmx thread #{i + 1}") @queue_conf << :END end threads.each {|t| t.join } end end |
#validate_configuration(conf_hash) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/logstash/inputs/jmx.rb', line 117 def validate_configuration(conf_hash) validation_errors = [] #Check required parameters in configuration ["host", "port","queries"].each do |param| validation_errors << MISSING_CONFIG_PARAMETER % param unless conf_hash.has_key?(param) end #Validate parameters type in configuration {"host" => String, "port" => Fixnum, "alias" => String }.each do |param, expected_type| if conf_hash.has_key?(param) && !conf_hash[param].instance_of?(expected_type) validation_errors << BAD_TYPE_CONFIG_PARAMETER % { :param => param, :expected => expected_type, :actual => conf_hash[param].class } end end if conf_hash.has_key?("queries") if !conf_hash["queries"].respond_to?(:each) validation_errors << BAD_TYPE_CONFIG_PARAMETER % { :param => 'queries', :expected => Enumerable, :actual => conf_hash['queries'].class } else conf_hash['queries'].each_with_index do |query,index| unless query.respond_to?(:[]) && query.respond_to?(:has_key?) validation_errors << BAD_TYPE_QUERY % {:index => index, :expected => Hash, :actual => query.class} next end #Check required parameters in each query ["object_name"].each do |param| validation_errors << MISSING_QUERY_PARAMETER % [param,index] unless query.has_key?(param) end #Validate parameters type in each query {"object_name" => String, "object_alias" => String }.each do |param, expected_type| if query.has_key?(param) && !query[param].instance_of?(expected_type) validation_errors << BAD_TYPE_QUERY_PARAMETER % { :param => param, :index => index, :expected => expected_type, :actual => query[param].class } end end if query.has_key?("attributes") && !query["attributes"].respond_to?(:each) validation_errors << BAD_TYPE_QUERY_PARAMETER % { :param => 'attributes', :index => index, :expected => Enumerable, :actual => query['attributes'].class } end end end end return validation_errors end |