Class: LogStash::Outputs::AES::HttpClient

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/amazon_es/http_client.rb

Constant Summary collapse

DEFAULT_OPTIONS =
{
  :port => 80
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ HttpClient

Returns a new instance of HttpClient.



15
16
17
18
19
# File 'lib/logstash/outputs/amazon_es/http_client.rb', line 15

def initialize(options={})
  @logger = Cabin::Channel.get
  @options = DEFAULT_OPTIONS.merge(options)
  @client = build_client(@options)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



10
11
12
# File 'lib/logstash/outputs/amazon_es/http_client.rb', line 10

def client
  @client
end

#client_optionsObject (readonly)

Returns the value of attribute client_options.



10
11
12
# File 'lib/logstash/outputs/amazon_es/http_client.rb', line 10

def client_options
  @client_options
end

#optionsObject (readonly)

Returns the value of attribute options.



10
11
12
# File 'lib/logstash/outputs/amazon_es/http_client.rb', line 10

def options
  @options
end

Instance Method Details

#bulk(actions) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/logstash/outputs/amazon_es/http_client.rb', line 29

def bulk(actions)
  bulk_body = actions.collect do |action, args, source|
    if action == 'update'
      if args[:_id]
        source = { 'doc' => source }
        if @options[:doc_as_upsert]
          source['doc_as_upsert'] = true
        else
          source['upsert'] = args[:_upsert] if args[:_upsert]
        end
      else
        raise(LogStash::ConfigurationError, "Specifying action => 'update' without a document '_id' is not supported.")
      end
    end

    args.delete(:_upsert)

    if source
      next [ { action => args }, source ]
    else
      next { action => args }
    end
  end.flatten

  bulk_response = @client.bulk(:body => bulk_body)

  self.class.normalize_bulk_response(bulk_response)
end

#template_install(name, template, force = false) ⇒ Object



21
22
23
24
25
26
27
# File 'lib/logstash/outputs/amazon_es/http_client.rb', line 21

def template_install(name, template, force=false)
  if template_exists?(name) && !force
    @logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name)
    return
  end
  template_put(name, template)
end