Class: LogStash::Inputs::Elasticsearch::SearchAfter
Constant Summary
collapse
- PIT_JOB =
"create point in time (PIT)"
- SEARCH_AFTER_JOB =
"search_after paginated search"
Instance Method Summary
collapse
#do_run, #initialize, #retryable
Instance Method Details
#clear(pit_id) ⇒ Object
221
222
223
224
225
226
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 221
def clear(pit_id)
logger.info("Closing point in time (PIT)")
@client.close_point_in_time(:body => {:id => pit_id} ) if pit?(pit_id)
rescue => e
logger.debug("Ignoring close_point_in_time exception", message: e.message, exception: e.class)
end
|
#create_pit ⇒ Object
129
130
131
132
133
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 129
def create_pit
logger.info("Create point in time (PIT)")
r = @client.open_point_in_time(index: @index, keep_alive: @scroll)
r['id']
end
|
#next_page(pit_id:, search_after: nil, slice_id: nil) ⇒ Object
157
158
159
160
161
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 157
def next_page(pit_id: , search_after: nil, slice_id: nil)
options = search_options(pit_id: pit_id, search_after: search_after, slice_id: slice_id)
logger.trace("search options", options)
@client.search(options)
end
|
#pit?(id) ⇒ Boolean
125
126
127
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 125
def pit?(id)
!!id&.is_a?(String)
end
|
#process_page(output_queue) ⇒ Object
163
164
165
166
167
168
169
170
171
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 163
def process_page(output_queue)
r = yield
r['hits']['hits'].each { |hit| @plugin.push_hit(hit, output_queue) }
has_hits = r['hits']['hits'].any?
search_after = r['hits']['hits'][-1]['sort'] rescue nil
logger.warn("Query got data but the sort value is empty") if has_hits && search_after.nil?
[ has_hits, search_after ]
end
|
#retryable_search(output_queue) ⇒ Object
198
199
200
201
202
203
204
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 198
def retryable_search(output_queue)
with_pit do |pit_id|
retryable(SEARCH_AFTER_JOB) do
search(output_queue: output_queue, pit_id: pit_id)
end
end
end
|
#retryable_slice_search(output_queue) ⇒ Object
206
207
208
209
210
211
212
213
214
215
216
217
218
219
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 206
def retryable_slice_search(output_queue)
with_pit do |pit_id|
@slices.times.map do |slice_id|
Thread.new do
LogStash::Util::set_thread_name("[#{@pipeline_id}]|input|elasticsearch|slice_#{slice_id}")
retryable(SEARCH_AFTER_JOB) do
search(output_queue: output_queue, slice_id: slice_id, pit_id: pit_id)
end
end
end.map(&:join)
end
logger.trace("#{@slices} slices completed")
end
|
#search(output_queue:, slice_id: nil, pit_id:) ⇒ Object
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 180
def search(output_queue:, slice_id: nil, pit_id:)
log_details = {}
log_details = log_details.merge({ slice_id: slice_id, slices: @slices }) unless slice_id.nil?
logger.info("Query start", log_details)
has_hits = true
search_after = nil
while has_hits && !@plugin.stop?
logger.debug("Query progress", log_details)
has_hits, search_after = process_page(output_queue) do
next_page(pit_id: pit_id, search_after: search_after, slice_id: slice_id)
end
end
logger.info("Query completed", log_details)
end
|
#search_options(pit_id:, search_after: nil, slice_id: nil) ⇒ Object
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 135
def search_options(pit_id: , search_after: nil, slice_id: nil)
body = @query.merge({
:pit => {
:id => pit_id,
:keep_alive => @scroll
}
})
body = body.merge(:sort => {"_shard_doc": "asc"}) if @query&.dig("sort").nil?
body = body.merge(:search_after => search_after) unless search_after.nil?
body = body.merge(:slice => {:id => slice_id, :max => @slices}) unless slice_id.nil?
{
:size => @size,
:body => body
}
end
|
#with_pit ⇒ Object
173
174
175
176
177
178
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 173
def with_pit
pit_id = retryable(PIT_JOB) { create_pit }
yield pit_id if pit?(pit_id)
ensure
clear(pit_id)
end
|