Class: Esse::Import::Bulk

Inherits:
Object
  • Object
show all
Defined in:
lib/esse/import/bulk.rb

Instance Method Summary collapse

Constructor Details

#initialize(type: nil, index: nil, delete: nil, create: nil, update: nil) ⇒ Bulk

Returns a new instance of Bulk.



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/esse/import/bulk.rb', line 4

def initialize(type: nil, index: nil, delete: nil, create: nil, update: nil)
  @index = Array(index).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc|
    value = doc.to_bulk
    value[:_type] ||= type if type
    { index: value }
  end
  @create = Array(create).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc|
    value = doc.to_bulk
    value[:_type] ||= type if type
    { create: value }
  end
  @update = Array(update).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc|
    value = doc.to_bulk(operation: :update)
    value[:_type] ||= type if type
    { update: value }
  end
  @delete = Array(delete).select(&method(:valid_doc?)).reject(&:ignore_on_delete?).map do |doc|
    value = doc.to_bulk(data: false)
    value[:_type] ||= type if type
    { delete: value }
  end
end

Instance Method Details

#each_request(max_retries: 4, last_retry_in_small_chunks: true) {|RequestBody| ... } ⇒ Object

Return an array of RequestBody instances

In case of timeout error, will retry with an exponential backoff using the following formula:

wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1)) seconds. It will retry up to max_retries times that is default 4.

Too large bulk requests will be split into multiple requests with only one attempt.

Yields:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/esse/import/bulk.rb', line 35

def each_request(max_retries: 4, last_retry_in_small_chunks: true)
  # @TODO create indexes when by checking all the index suffixes (if mapping is not empty)
  requests = [optimistic_request]
  retry_count = 0

  begin
    requests.each do |request|
      next unless request.body?
      resp = yield request
      if resp&.[]('errors')
        raise resp&.fetch('items', [])&.select { |item| item.values.first['error'] }&.join("\n")
      end
    end
  rescue Faraday::TimeoutError, Esse::Transport::RequestTimeoutError => e
    retry_count += 1
    raise Esse::Transport::RequestTimeoutError.new(e.message) if retry_count >= max_retries
    # Timeout error may be caused by a too large request, so we split the requests in small chunks as a last attempt
    requests = requests_in_small_chunks if last_retry_in_small_chunks && max_retries > 2 && retry_count == max_retries - 2
    wait_interval = (retry_count**4) + 15 + (rand(10) * (retry_count + 1))
    Esse.logger.warn "Timeout error, retrying in #{wait_interval} seconds"
    sleep(wait_interval)
    retry
  rescue Esse::Transport::RequestEntityTooLargeError => e
    retry_count += 1
    raise e if retry_count > 1 # only retry once on this error
    requests = balance_requests_size(e)
    Esse.logger.warn <<~MSG
      Request entity too large, retrying with a bulk with: #{requests.map(&:bytesize).join(' + ')}.
      Note that this cause performance degradation, consider adjusting the batch_size of the index or increasing the bulk size.
    MSG
    retry
  end
end