Class: LogStash::Outputs::ElasticSearch
- Includes:
- Stud::Buffer
- Defined in:
- lib/logstash/outputs/elasticsearch.rb
Overview
This output lets you store logs in elasticsearch and is the most recommended output for logstash. If you plan on using the logstash web interface, you’ll need to use this output.
*VERSION NOTE*: Your elasticsearch cluster must be running elasticsearch
%ELASTICSEARCH_VERSION%. If you use any other version of elasticsearch,
you should consider using the [elasticsearch_http](elasticsearch_http)
output instead.
If you want to set other elasticsearch options that are not exposed directly as config options, there are two options:
-
create an elasticsearch.yml file in the $PWD of the logstash process
-
pass in es.* java properties (java -Des.node.foo= or ruby -J-Des.node.foo=)
This plugin will join your elasticsearch cluster, so it will show up in elasticsearch’s cluster health status.
You can learn more about elasticsearch at <elasticsearch.org>
## Operational Notes
Template management is a new feature and requires at least version Elasticsearch 0.90.5+
If you are still using a version older than this, please upgrade for more benefits than just template management.
Your firewalls will need to permit port 9300 in both directions (from logstash to elasticsearch, and elasticsearch to logstash)
Constant Summary
Constants included from Config::Mixin
Instance Attribute Summary
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
-
#flush(events, teardown = false) ⇒ Object
def receive.
- #get_template_json ⇒ Object
- #receive(event) ⇒ Object
- #register ⇒ Object
Methods inherited from Base
#handle, #handle_worker, #initialize, #worker_setup, #workers_not_supported
Methods included from Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s
Constructor Details
This class inherits a constructor from LogStash::Outputs::Base
Instance Method Details
#flush(events, teardown = false) ⇒ Object
def receive
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 308 def flush(events, teardown=false) request = @client.bulk events.each do |event, index, type| index = event.sprintf(@index) # Set the 'type' value for the index. if @index_type.nil? type = event["type"] || "logs" else type = event.sprintf(@index_type) end if @document_id request.index(index, type, event.sprintf(@document_id), event.to_json) else request.index(index, type, nil, event.to_json) end end request.execute! # TODO(sissel): Handle errors. Since bulk requests could mostly succeed # (aka partially fail), we need to figure out what documents need to be # retried. end |
#get_template_json ⇒ Object
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 264 def get_template_json if @template.nil? if __FILE__ =~ /^(jar:)?file:\/.+!.+/ begin # Running from a jar, assume types.db is at the root. jar_path = [__FILE__.split("!").first, "/elasticsearch-template.json"].join("!") @template = jar_path rescue => ex raise "Failed to cache, due to: #{ex}\n#{ex.backtrace}" end else if File.exists?("elasticsearch-template.json") @template = "elasticsearch-template.json" elsif File.exists?("lib/logstash/outputs/elasticsearch/elasticsearch-template.json") @template = "lib/logstash/outputs/elasticsearch/elasticsearch-template.json" else raise "You must specify 'template => ...' in your elasticsearch_http output" end end end @template_json = IO.read(@template).gsub(/\n/,'') @logger.info("Using mapping template", :template => @template_json) end |
#receive(event) ⇒ Object
303 304 305 306 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 303 def receive(event) return unless output?(event) buffer_receive([event, index, type]) end |
#register ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 154 def register # TODO(sissel): find a better way of declaring where the elasticsearch # libraries are # TODO(sissel): can skip this step if we're running from a jar. jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/elasticsearch*/lib/*.jar") Dir[jarpath].each do |jar| require jar end # setup log4j properties for elasticsearch LogStash::Logger.setup_log4j(@logger) if @embedded # Default @host with embedded to localhost. This should help avoid # newbies tripping on ubuntu and other distros that have a default # firewall that blocks multicast. @host ||= "localhost" # Start elasticsearch local. start_local_elasticsearch end require "jruby-elasticsearch" @logger.info("New ElasticSearch output", :cluster => @cluster, :host => @host, :port => @port, :embedded => @embedded) = { :cluster => @cluster, :host => @host, :port => @port, :bind_host => @bind_host, :node_name => @node_name, } # :node or :transport protocols [:type] = @protocol.to_sym [:bind_port] = @bind_port unless @bind_port.nil? # TransportClient requires a number for a port. [:port] = [:port].to_i if [:type] == :transport @client = ElasticSearch::Client.new() # Check to see if we *can* get the template java_client = @client.instance_eval{@client} begin check_template = ElasticSearch::GetIndexTemplatesRequest.new(java_client, @template_name) result = check_template.execute #=> Run previously... rescue Exception => e @logger.error("Unable to check template. Automatic template management disabled.", :error => e.to_s) @manage_template = false end if @manage_template @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) if @template_overwrite @logger.info("Template overwrite enabled. Deleting template if it exists.", :template_overwrite => @template_overwrite.to_s) if !result.getIndexTemplates.isEmpty delete_template = ElasticSearch::DeleteIndexTemplateRequest.new(java_client, @template_name) result = delete_template.execute if result.isAcknowledged @logger.info("Successfully deleted template", :template_name => @template_name) else @logger.error("Failed to delete template", :template_name => @template_name) end end end # end if @template_overwrite has_template = false @logger.debug("Fetching all templates...") gettemplates = ElasticSearch::GetIndexTemplatesRequest.new(java_client, "*") result = gettemplates.execute # Results of this come as a list, so we need to iterate through it if !result.getIndexTemplates.isEmpty = result.getIndexTemplates templates = {} i = 0 .size.times do template_data = .get(i) templates[template_data.name] = template_data.template i += 1 end template_idx_name = @index.sub(/%{[^}]+}/,'*') alt_template_idx_name = @index.sub(/-%{[^}]+}/,'*') if !templates.any? { |k,v| v == template_idx_name || v == alt_template_idx_name } @logger.debug("No logstash template found in Elasticsearch", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name) else has_template = true @logger.info("Found existing Logstash template match.", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name, :templates => templates.to_s) end end if !has_template #=> No template found, we're going to add one get_template_json put_template = ElasticSearch::PutIndexTemplateRequest.new(java_client, @template_name, @template_json) result = put_template.execute if result.isAcknowledged @logger.info("Successfully inserted template", :template_name => @template_name) else @logger.error("Failed to insert template", :template_name => @template_name) end end end # if @manage_templates buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger ) end |