Class: LogStash::Outputs::ElasticSearchHTTP

Inherits:
Base
  • Object
show all
Includes:
Stud::Buffer
Defined in:
lib/logstash/outputs/elasticsearch_http.rb

Overview

This output lets you store logs in elasticsearch.

This plugin uses the HTTP/REST interface to ElasticSearch, which usually lets you use any version of elasticsearch server. It is known to work with elasticsearch %ELASTICSEARCH_VERSION%

You can learn more about elasticsearch at <elasticsearch.org>

Constant Summary

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods inherited from Base

#handle, #handle_worker, #initialize, #worker_setup, #workers_not_supported

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s

Constructor Details

This class inherits a constructor from LogStash::Outputs::Base

Instance Method Details

#flush(events, teardown = false) ⇒ Object

def receive



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/logstash/outputs/elasticsearch_http.rb', line 204

def flush(events, teardown=false)
  # Avoid creating a new string for newline every time
  newline = "\n".freeze

  body = events.collect do |event, index, type|
    index = event.sprintf(@index)

    # Set the 'type' value for the index.
    if @index_type.nil?
      type =  event["type"] || "logs"
    else
      type = event.sprintf(@index_type)
    end
    header = { "index" => { "_index" => index, "_type" => type } }
    header["index"]["_id"] = event.sprintf(@document_id) if !@document_id.nil?

    [ header.to_json, newline, event.to_json, newline ]
  end.flatten
  post(body.join(""))
end

#get_template_jsonObject



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/logstash/outputs/elasticsearch_http.rb', line 174

def get_template_json
  if @template.nil?
    if __FILE__ =~ /^(jar:)?file:\/.+!.+/
      begin
        # Running from a jar, assume types.db is at the root.
        jar_path = [__FILE__.split("!").first, "/elasticsearch-template.json"].join("!")
        @template = jar_path
      rescue => ex
        raise "Failed to cache, due to: #{ex}\n#{ex.backtrace}"
      end
    else
      if File.exists?("elasticsearch-template.json")
        @template = "elasticsearch-template.json"
      elsif File.exists?("lib/logstash/outputs/elasticsearch/elasticsearch-template.json")
        @template = "lib/logstash/outputs/elasticsearch/elasticsearch-template.json"
      else
        raise "You must specify 'template => ...' in your elasticsearch_http output"
      end
    end
  end
  @template_json = IO.read(@template).gsub(/\n/,'')
  @logger.info("Using mapping template", :template => @template_json)
end

#post(body) ⇒ Object

def receive_bulk



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/logstash/outputs/elasticsearch_http.rb', line 225

def post(body)
  begin
    response = @agent.post!(@bulk_url, :body => body)
  rescue EOFError
    @logger.warn("EOF while writing request or reading response header from elasticsearch",
                 :host => @host, :port => @port)
    return # abort this flush
  end

  # Consume the body for error checking
  # This will also free up the connection for reuse.
  body = ""
  begin
    response.read_body { |chunk| body += chunk }
  rescue EOFError
    @logger.warn("EOF while reading response body from elasticsearch",
                 :host => @host, :port => @port)
    return # abort this flush
  end

  if response.status != 200
    @logger.error("Error writing (bulk) to elasticsearch",
                  :response => response, :response_body => body,
                  :request_body => @queue.join("\n"))
    return
  end
end

#receive(event) ⇒ Object



199
200
201
202
# File 'lib/logstash/outputs/elasticsearch_http.rb', line 199

def receive(event)
  return unless output?(event)
  buffer_receive([event, index, type])
end

#registerObject



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/logstash/outputs/elasticsearch_http.rb', line 96

def register
  require "ftw" # gem ftw
  @agent = FTW::Agent.new
  @queue = []

  auth = @user && @password ? "#{@user}:#{@password.value}@" : ""
  @bulk_url = "http://#{auth}#{@host}:#{@port}/_bulk?replication=#{@replication}"
  if @manage_template
    @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
    template_search_url = "http://#{auth}#{@host}:#{@port}/_template/*"
    @template_url = "http://#{auth}#{@host}:#{@port}/_template/#{@template_name}"
    if @template_overwrite
      @logger.info("Template overwrite enabled.  Deleting existing template.", :template_overwrite => @template_overwrite.to_s)
      response = @agent.get!(@template_url)
      template_action('delete') if response.status == 200 #=> Purge the old template if it exists
    end
    @logger.debug("Template Search URL:", :template_search_url => template_search_url)
    has_template = false
    template_idx_name = @index.sub(/%{[^}]+}/,'*')
    alt_template_idx_name = @index.sub(/-%{[^}]+}/,'*')
    # Get the template data
    response = @agent.get!(template_search_url)
    json = ""
    if response.status == 404 #=> This condition can occcur when no template has ever been appended
      @logger.info("No template found in Elasticsearch...")
      get_template_json
      template_action('put')
    elsif response.status == 200
      begin
        response.read_body { |c| json << c }
        results = JSON.parse(json)
      rescue Exception => e
        @logger.error("Error parsing JSON", :json => json, :results => results.to_s, :error => e.to_s)
        raise "Exception in parsing JSON", e
      end
      if !results.any? { |k,v| v["template"] == template_idx_name || v["template"] == alt_template_idx_name }
        @logger.debug("No template found in Elasticsearch", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name)
        get_template_json
        template_action('put')      
      end
    else #=> Some other status code?
      @logger.error("Could not check for existing template.  Check status code.", :status => response.status.to_s)
    end # end if response.status == 200
  end # end if @manage_template
  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger
  )
end

#teardownObject

def post



253
254
255
# File 'lib/logstash/outputs/elasticsearch_http.rb', line 253

def teardown
  buffer_flush(:final => true)
end

#template_action(command) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/logstash/outputs/elasticsearch_http.rb', line 148

def template_action(command)
  begin
    if command == 'delete'
      response = @agent.delete!(@template_url)
      response.discard_body
    elsif command == 'put'
      response = @agent.put!(@template_url, :body => @template_json)
      response.discard_body
    end
  rescue EOFError
    @logger.warn("EOF while attempting request or reading response header from elasticsearch",
                 :host => @host, :port => @port)
    return # abort this action
  end
  if response.status != 200
    @logger.error("Error acting on elasticsearch mapping template",
                  :response => response, :action => command,
                  :request_url => @template_url)
    return
  end
  @logger.info("Successfully deleted template", :template_url => @template_url) if command == 'delete'
  @logger.info("Successfully applied template", :template_url => @template_url) if command == 'put'
end