Class: Anschel::Output::Elasticsearch
- Defined in:
- lib/anschel/output/elasticsearch.rb
Instance Method Summary collapse
-
#initialize(config, stats, log) ⇒ Elasticsearch
constructor
A new instance of Elasticsearch.
Methods inherited from Base
Constructor Details
#initialize(config, stats, log) ⇒ Elasticsearch
Returns a new instance of Elasticsearch.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/anschel/output/elasticsearch.rb', line 13 def initialize config, stats, log default_index = config.delete(:default_index) || '.anschel' qsize = config.delete(:queue_size) || 2000 bsize = config.delete(:bulk_size) || 500 timeout = config.delete(:bulk_timeout) || 2.0 slice = timeout / bsize client = ::Elasticsearch::Client.new config client.transport.reload_connections! @queue = SizedQueue.new qsize @thread = Thread.new do loop do events = [] count = 0 start = Time.now.to_f until (Time.now.to_f - start > timeout) || (count > bsize) begin events.push @queue.shift(true) count += 1 rescue # shift returned immediately sleep slice end end next if events.empty? body = events.map do |e| index = e.delete(:_index) routing = e.delete(:_routing) if index.nil? log.error \ event: 'elasticsearch-output-error', reason: 'event was not indexed', remediation: "sending to default index '#{default_index}'", raw_event: e index = default_index end item = { _index: index, _type: e[:type], data: e } item[:_routing] = routing if routing { index: item } end response = client.bulk body: body if response['errors'] log.error \ event: 'elasticsearch-output-error', reason: 'response contained errors', body: body, response: response end stats.inc 'output', body.size end end end |