Class: Fluent::Plugin::OpenSearchErrorHandler

Inherits:
Object
  • Object
show all
Includes:
OpenSearchConstants
Defined in:
lib/fluent/plugin/opensearch_error_handler.rb

Defined Under Namespace

Classes: OpenSearchError, OpenSearchRequestAbortError, OpenSearchSubmitMismatch, OpenSearchVersionMismatch

Constant Summary

Constants included from OpenSearchConstants

Fluent::Plugin::OpenSearchConstants::BODY_DELIMITER, Fluent::Plugin::OpenSearchConstants::CREATE_OP, Fluent::Plugin::OpenSearchConstants::ID_FIELD, Fluent::Plugin::OpenSearchConstants::INDEX_OP, Fluent::Plugin::OpenSearchConstants::TIMESTAMP_FIELD, Fluent::Plugin::OpenSearchConstants::UPDATE_OP, Fluent::Plugin::OpenSearchConstants::UPSERT_OP

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(plugin) ⇒ OpenSearchErrorHandler

Returns a new instance of OpenSearchErrorHandler.



40
41
42
# File 'lib/fluent/plugin/opensearch_error_handler.rb', line 40

def initialize(plugin)
  @plugin = plugin
end

Instance Attribute Details

#bulk_message_countObject

Returns the value of attribute bulk_message_count.



34
35
36
# File 'lib/fluent/plugin/opensearch_error_handler.rb', line 34

def bulk_message_count
  @bulk_message_count
end

Instance Method Details

#emit_error_label_event?Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/fluent/plugin/opensearch_error_handler.rb', line 68

def emit_error_label_event?
  !!@plugin.emit_error_label_event
end

#handle_error(response, tag, chunk, bulk_message_count, extracted_values) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/fluent/plugin/opensearch_error_handler.rb', line 72

def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
  items = response['items']
  if items.nil? || !items.is_a?(Array)
    raise OpenSearchVersionMismatch, "The response format was unrecognized: #{response}"
  end
  if bulk_message_count != items.length
    raise OpenSearchSubmitMismatch, "The number of records submitted #{bulk_message_count} do not match the number returned #{items.length}. Unable to process bulk response."
  end
  retry_stream = Fluent::MultiEventStream.new
  stats = Hash.new(0)
  meta = {}
  header = {}
  affinity_target_indices = @plugin.get_affinity_target_indices(chunk)
  chunk.msgpack_each do |time, rawrecord|
    bulk_message = ''
    next unless rawrecord.is_a? Hash
    begin
      # we need a deep copy for process_message to alter
      processrecord = Marshal.load(Marshal.dump(rawrecord))
      meta, header, record = @plugin.process_message(tag, meta, header, time, processrecord, affinity_target_indices, extracted_values)
      next unless @plugin.append_record_to_messages(@plugin.write_operation, meta, header, record, bulk_message)
    rescue => e
      @plugin.log.debug("Exception in error handler during deep copy: #{e}")
      stats[:bad_chunk_record] += 1
      next
    end
    item = items.shift
    if item.is_a?(Hash) && item.has_key?(@plugin.write_operation)
      write_operation = @plugin.write_operation
    elsif INDEX_OP == @plugin.write_operation && item.is_a?(Hash) && item.has_key?(CREATE_OP)
      write_operation = CREATE_OP
    elsif UPSERT_OP == @plugin.write_operation && item.is_a?(Hash) && item.has_key?(UPDATE_OP)
      write_operation = UPDATE_OP
    elsif item.nil?
      stats[:errors_nil_resp] += 1
      next
    else
      # When we don't have an expected ops field, something changed in the API
      # expected return values.
      stats[:errors_bad_resp] += 1
      next
    end
    if item[write_operation].has_key?('status')
      status = item[write_operation]['status']
    else
      # When we don't have a status field, something changed in the API
      # expected return values.
      stats[:errors_bad_resp] += 1
      next
    end
    case
    when [200, 201].include?(status)
      stats[:successes] += 1
    when CREATE_OP == write_operation && 409 == status
      stats[:duplicates] += 1
    when 400 == status
      stats[:bad_argument] += 1
      reason = ""
      log_os_400_reason do
        if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type')
          reason = " [error type]: #{item[write_operation]['error']['type']}"
        end
        if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('reason')
          reason += " [reason]: \'#{item[write_operation]['error']['reason']}\'"
        end
      end
      if emit_error_label_event?
        @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("400 - Rejected by OpenSearch#{reason}"))
      end
    else
      if item[write_operation]['error'].is_a?(String)
        reason = item[write_operation]['error']
        stats[:errors_block_resp] += 1
        if emit_error_label_event?
          @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - #{reason}"))
        end
        next
      elsif item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type')
        type = item[write_operation]['error']['type']
        stats[type] += 1
        if unrecoverable_error?(type)
          raise OpenSearchRequestAbortError, "Rejected OpenSearch due to #{type}"
        end
        if unrecoverable_record_error?(type)
          if emit_error_label_event?
            @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - #{type}: #{reason}"))
          end
          next
        else
          retry_stream.add(time, rawrecord) unless unrecoverable_record_error?(type)
        end
      else
        # When we don't have a type field, something changed in the API
        # expected return values.
        stats[:errors_bad_resp] += 1
        if emit_error_label_event?
          @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - No error type provided in the response"))
        end
        next
      end
      stats[type] += 1
    end
  end
  @plugin.log.on_debug do
    msg = ["Indexed (op = #{@plugin.write_operation})"]
    stats.each_pair { |key, value| msg << "#{value} #{key}" }
    @plugin.log.debug msg.join(', ')
  end
  raise Fluent::Plugin::OpenSearchOutput::RetryStreamError.new(retry_stream) unless retry_stream.empty?
end

#log_os_400_reason(&block) ⇒ Object



60
61
62
63
64
65
66
# File 'lib/fluent/plugin/opensearch_error_handler.rb', line 60

def log_os_400_reason(&block)
  if @plugin.log_os_400_reason
    block.call
  else
    @plugin.log.on_debug(&block)
  end
end

#unrecoverable_error?(type) ⇒ Boolean

Returns:

  • (Boolean)


48
49
50
# File 'lib/fluent/plugin/opensearch_error_handler.rb', line 48

def unrecoverable_error?(type)
  unrecoverable_error_types.include?(type)
end

#unrecoverable_error_typesObject



44
45
46
# File 'lib/fluent/plugin/opensearch_error_handler.rb', line 44

def unrecoverable_error_types
  @plugin.unrecoverable_error_types
end

#unrecoverable_record_error?(type) ⇒ Boolean

Returns:

  • (Boolean)


56
57
58
# File 'lib/fluent/plugin/opensearch_error_handler.rb', line 56

def unrecoverable_record_error?(type)
  unrecoverable_record_types.include?(type)
end

#unrecoverable_record_typesObject



52
53
54
# File 'lib/fluent/plugin/opensearch_error_handler.rb', line 52

def unrecoverable_record_types
  @plugin.unrecoverable_record_types
end