Class: Fluent::Plugin::ElasticsearchErrorHandler
- Inherits:
-
Object
- Object
- Fluent::Plugin::ElasticsearchErrorHandler
- Includes:
- ElasticsearchConstants
- Defined in:
- lib/fluent/plugin/elasticsearch_error_handler.rb
Defined Under Namespace
Classes: ElasticsearchError, ElasticsearchRequestAbortError, ElasticsearchSubmitMismatch, ElasticsearchVersionMismatch
Constant Summary
Constants included from ElasticsearchConstants
Fluent::Plugin::ElasticsearchConstants::BODY_DELIMITER, Fluent::Plugin::ElasticsearchConstants::CREATE_OP, Fluent::Plugin::ElasticsearchConstants::ID_FIELD, Fluent::Plugin::ElasticsearchConstants::INDEX_OP, Fluent::Plugin::ElasticsearchConstants::TIMESTAMP_FIELD, Fluent::Plugin::ElasticsearchConstants::UPDATE_OP, Fluent::Plugin::ElasticsearchConstants::UPSERT_OP
Instance Attribute Summary collapse
-
#bulk_message_count ⇒ Object
Returns the value of attribute bulk_message_count.
Instance Method Summary collapse
- #handle_error(response, tag, chunk, bulk_message_count, extracted_values) ⇒ Object
-
#initialize(plugin) ⇒ ElasticsearchErrorHandler
constructor
A new instance of ElasticsearchErrorHandler.
- #log_es_400_reason(&block) ⇒ Object
- #unrecoverable_error?(type) ⇒ Boolean
- #unrecoverable_error_types ⇒ Object
Constructor Details
#initialize(plugin) ⇒ ElasticsearchErrorHandler
Returns a new instance of ElasticsearchErrorHandler.
14 15 16 |
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 14 def initialize(plugin) @plugin = plugin end |
Instance Attribute Details
#bulk_message_count ⇒ Object
Returns the value of attribute bulk_message_count.
8 9 10 |
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 8 def @bulk_message_count end |
Instance Method Details
#handle_error(response, tag, chunk, bulk_message_count, extracted_values) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 34 def handle_error(response, tag, chunk, , extracted_values) items = response['items'] if items.nil? || !items.is_a?(Array) raise ElasticsearchVersionMismatch, "The response format was unrecognized: #{response}" end if != items.length raise ElasticsearchSubmitMismatch, "The number of records submitted #{} do not match the number returned #{items.length}. Unable to process bulk response." end retry_stream = Fluent::MultiEventStream.new stats = Hash.new(0) = {} header = {} chunk.msgpack_each do |time, rawrecord| = '' next unless rawrecord.is_a? Hash begin # we need a deep copy for process_message to alter processrecord = Marshal.load(Marshal.dump(rawrecord)) , header, record = @plugin.(tag, , header, time, processrecord, extracted_values) next unless @plugin.(@plugin.write_operation, , header, record, ) rescue => e stats[:bad_chunk_record] += 1 next end item = items.shift if item.is_a?(Hash) && item.has_key?(@plugin.write_operation) write_operation = @plugin.write_operation elsif INDEX_OP == @plugin.write_operation && item.is_a?(Hash) && item.has_key?(CREATE_OP) write_operation = CREATE_OP elsif UPSERT_OP == @plugin.write_operation && item.is_a?(Hash) && item.has_key?(UPDATE_OP) write_operation = UPDATE_OP elsif item.nil? stats[:errors_nil_resp] += 1 next else # When we don't have an expected ops field, something changed in the API # expected return values (ES 2.x) stats[:errors_bad_resp] += 1 next end if item[write_operation].has_key?('status') status = item[write_operation]['status'] else # When we don't have a status field, something changed in the API # expected return values (ES 2.x) stats[:errors_bad_resp] += 1 next end case when [200, 201].include?(status) stats[:successes] += 1 when CREATE_OP == write_operation && 409 == status stats[:duplicates] += 1 when 400 == status stats[:bad_argument] += 1 reason = "" log_es_400_reason do if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type') reason = " [error type]: #{item[write_operation]['error']['type']}" end if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('reason') reason += " [reason]: \'#{item[write_operation]['error']['reason']}\'" end end @plugin.router.emit_error_event(tag, time, rawrecord, ElasticsearchError.new("400 - Rejected by Elasticsearch#{reason}")) else if item[write_operation]['error'].is_a?(String) reason = item[write_operation]['error'] stats[:errors_block_resp] += 1 @plugin.router.emit_error_event(tag, time, rawrecord, ElasticsearchError.new("#{status} - #{reason}")) next elsif item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type') type = item[write_operation]['error']['type'] stats[type] += 1 retry_stream.add(time, rawrecord) if unrecoverable_error?(type) raise ElasticsearchRequestAbortError, "Rejected Elasticsearch due to #{type}" end else # When we don't have a type field, something changed in the API # expected return values (ES 2.x) stats[:errors_bad_resp] += 1 @plugin.router.emit_error_event(tag, time, rawrecord, ElasticsearchError.new("#{status} - No error type provided in the response")) next end stats[type] += 1 end end @plugin.log.on_debug do msg = ["Indexed (op = #{@plugin.write_operation})"] stats.each_pair { |key, value| msg << "#{value} #{key}" } @plugin.log.debug msg.join(', ') end raise Fluent::Plugin::ElasticsearchOutput::RetryStreamError.new(retry_stream) unless retry_stream.empty? end |
#log_es_400_reason(&block) ⇒ Object
26 27 28 29 30 31 32 |
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 26 def log_es_400_reason(&block) if @plugin.log_es_400_reason block.call else @plugin.log.on_debug(&block) end end |
#unrecoverable_error?(type) ⇒ Boolean
22 23 24 |
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 22 def unrecoverable_error?(type) unrecoverable_error_types.include?(type) end |
#unrecoverable_error_types ⇒ Object
18 19 20 |
# File 'lib/fluent/plugin/elasticsearch_error_handler.rb', line 18 def unrecoverable_error_types @plugin.unrecoverable_error_types end |