Class: LogStash::Outputs::ElasticSearch::HttpClient
- Inherits:
-
Object
- Object
- LogStash::Outputs::ElasticSearch::HttpClient
- Defined in:
- lib/logstash/outputs/elasticsearch/http_client.rb,
lib/logstash/outputs/elasticsearch/http_client/pool.rb,
lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb
Overview
20MiB
Defined Under Namespace
Classes: ManticoreAdapter, Pool
Instance Attribute Summary collapse
-
#action_count ⇒ Object
readonly
Returns the value of attribute action_count.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#pool ⇒ Object
readonly
Returns the value of attribute pool.
-
#recv_count ⇒ Object
readonly
Returns the value of attribute recv_count.
Instance Method Summary collapse
- #bulk(actions) ⇒ Object
- #bulk_send(bulk_body) ⇒ Object
- #close ⇒ Object
- #get_version ⇒ Object
-
#initialize(options = {}) ⇒ HttpClient
constructor
This is here in case we use DEFAULT_OPTIONS in the future DEFAULT_OPTIONS = { :setting => value }.
- #join_bulk_responses(bulk_responses) ⇒ Object
- #template_install(name, template, force = false) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ HttpClient
This is here in case we use DEFAULT_OPTIONS in the future DEFAULT_OPTIONS =
:setting => value
30 31 32 33 34 35 36 37 38 |
# File 'lib/logstash/outputs/elasticsearch/http_client.rb', line 30 def initialize(={}) @logger = [:logger] # Again, in case we use DEFAULT_OPTIONS in the future, uncomment this. # @options = DEFAULT_OPTIONS.merge(options) @options = @pool = build_pool(@options) # mutex to prevent requests and sniffing to access the # connection pool at the same time end |
Instance Attribute Details
#action_count ⇒ Object (readonly)
Returns the value of attribute action_count.
24 25 26 |
# File 'lib/logstash/outputs/elasticsearch/http_client.rb', line 24 def action_count @action_count end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
24 25 26 |
# File 'lib/logstash/outputs/elasticsearch/http_client.rb', line 24 def client @client end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
24 25 26 |
# File 'lib/logstash/outputs/elasticsearch/http_client.rb', line 24 def logger @logger end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
24 25 26 |
# File 'lib/logstash/outputs/elasticsearch/http_client.rb', line 24 def @options end |
#pool ⇒ Object (readonly)
Returns the value of attribute pool.
24 25 26 |
# File 'lib/logstash/outputs/elasticsearch/http_client.rb', line 24 def pool @pool end |
#recv_count ⇒ Object (readonly)
Returns the value of attribute recv_count.
24 25 26 |
# File 'lib/logstash/outputs/elasticsearch/http_client.rb', line 24 def recv_count @recv_count end |
Instance Method Details
#bulk(actions) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/logstash/outputs/elasticsearch/http_client.rb', line 53 def bulk(actions) @action_count ||= 0 @action_count += actions.size return if actions.empty? bulk_actions = actions.collect do |action, args, source| args, source = update_action_builder(args, source) if action == 'update' if source && action != 'delete' next [ { action => args }, source ] else next { action => args } end end bulk_body = "" bulk_responses = [] bulk_actions.each do |action| as_json = action.is_a?(Array) ? action.map {|line| LogStash::Json.dump(line)}.join("\n") : LogStash::Json.dump(action) as_json << "\n" if (bulk_body.bytesize + as_json.bytesize) > TARGET_BULK_BYTES bulk_responses << bulk_send(bulk_body) bulk_body = as_json else bulk_body << as_json end end bulk_responses << bulk_send(bulk_body) if bulk_body.size > 0 join_bulk_responses(bulk_responses) end |
#bulk_send(bulk_body) ⇒ Object
98 99 100 101 102 |
# File 'lib/logstash/outputs/elasticsearch/http_client.rb', line 98 def bulk_send(bulk_body) # Discard the URL url, response = @pool.post("_bulk", nil, bulk_body) LogStash::Json.load(response.body) end |
#close ⇒ Object
104 105 106 |
# File 'lib/logstash/outputs/elasticsearch/http_client.rb', line 104 def close @pool.close end |
#get_version ⇒ Object
48 49 50 51 |
# File 'lib/logstash/outputs/elasticsearch/http_client.rb', line 48 def get_version url, response = @pool.get("") LogStash::Json.load(response.body)["version"] end |
#join_bulk_responses(bulk_responses) ⇒ Object
91 92 93 94 95 96 |
# File 'lib/logstash/outputs/elasticsearch/http_client.rb', line 91 def join_bulk_responses(bulk_responses) { "errors" => bulk_responses.any? {|r| r["errors"] == true}, "items" => bulk_responses.reduce([]) {|m,r| m.concat(r["items"])} } end |
#template_install(name, template, force = false) ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/logstash/outputs/elasticsearch/http_client.rb', line 40 def template_install(name, template, force=false) if template_exists?(name) && !force @logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name) return end template_put(name, template) end |