Module: ElasticsearchRecord::Relation::ResultMethods

Defined in:
lib/elasticsearch_record/relation/result_methods.rb

Instance Method Summary collapse

Instance Method Details

#agg_pluck(*column_names) ⇒ Hash

aggregate pluck provided columns. returns a hash of values for each provided column

Examples:

Person.agg_pluck(:name)
#> {"name" => ['David', 'Jeremy', 'Jose']}

Person.agg_pluck(:id, :name)
#> {"id" => ['11', '2', '5'], "name" => ['David', 'Jeremy', 'Jose']}

Parameters:

  • column_names (Array)

Returns:

  • (Hash)


16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 16

def agg_pluck(*column_names)
  scope = self.spawn

  column_names.each do |column_name|
    scope.aggregate!(column_name, { terms: { field: column_name, size: limit_value || 10 } })
  end

  scope.aggregations.reduce({}) { |m, (k, v)|
    m[k.to_s] = v[:buckets].map { |bucket| bucket[:key] }
    m
  }
end

#aggregationsHash

returns the RAW aggregations for the current query

Returns:

  • (Hash)


224
225
226
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 224

def aggregations
  spawn.aggs_only!.resolve('Aggregations').aggregations
end

#aggs_only!Object

sets query as "aggs"-only query (drops the size & sort options - so no hits will return)



259
260
261
262
263
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 259

def aggs_only!
  configure!({ size: 0, from: nil, sort: nil, _source: false })

  self
end

#bucketsActiveSupport::HashWithIndifferentAccess, Hash

returns the response aggregations and resolve the buckets as key->value hash.

Returns:

  • (ActiveSupport::HashWithIndifferentAccess, Hash)


230
231
232
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 230

def buckets
  spawn.aggs_only!.resolve('Buckets').buckets
end

#composite(*column_names) ⇒ Hash

A multi-bucket aggregation that creates composite buckets from different sources. PLEASE NOTE: The composite aggregation is expensive. Load test your application before deploying a composite aggregation in production!

For a single column_name a hash with the distinct key and the +doc_count+ as value is returned. For multiple column_names a hash with the distinct keys (as hash) and the +doc_count+ as value is returned.

Examples:

Person.composite(:name)
#> {"David" => 10, "Jeremy" => 1, "Jose" => 24}

Person.composite(:name, :age)
#> {
    {name: "David", age: "16"} => 3,
    {name: "David", age: "18"} => 6,
    {name: "David", age: "20"} => 1,
    {name: "Jeremy", age: "20"} => 1,
    {name: "Jose", age: "6"} => 2,
    ...
 }

Parameters:

  • column_names (Array)

Returns:

  • (Hash)


51
52
53
54
55
56
57
58
59
60
61
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 51

def composite(*column_names)
  scope = self.spawn
  scope.aggregate!(:composite_bucket, { composite: { size: limit_value || 10, sources: column_names.map { |column_name| { column_name => { terms: { field: column_name } } } } } })

  if column_names.size == 1
    column_name = column_names[0]
    scope.aggregations[:composite_bucket][:buckets].reduce({}) { |m, bucket| m[bucket[:key][column_name]] = bucket[:doc_count]; m }
  else
    scope.aggregations[:composite_bucket][:buckets].reduce({}) { |m, bucket| m[bucket[:key]] = bucket[:doc_count]; m }
  end
end

#hitsArray

returns the RAW hits for the current query

Returns:

  • (Array)


236
237
238
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 236

def hits
  spawn.hits_only!.resolve('Hits').hits
end

#hits_only!Object

sets query as "hits"-only query (drops the aggs from the query)



252
253
254
255
256
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 252

def hits_only!
  configure!({ aggs: nil })

  self
end

#pit_delete(keep_alive: '1m', batch_size: 1000, refresh: true) ⇒ Integer

executes a delete query in a +point_in_time+ scope. this will provide the possibility to delete more than the +max_result_window+ (default: 10000) docs in a batched process.

Parameters:

  • keep_alive (String) (defaults to: '1m')
  • batch_size (Integer) (defaults to: 1000)
  • refresh (Boolean) (defaults to: true)

    index after delete finished (default: true)

Returns:

  • (Integer)

    total amount of deleted docs



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 200

def pit_delete(keep_alive: '1m', batch_size: 1000, refresh: true)
  delete_count = select('_id').pit_results(keep_alive: keep_alive, batch_size: batch_size, ids_only: true) do |ids|
    # skip empty results
    next unless ids.any?

    # delete all IDs, but do not refresh index, yet
    klass.connection.api(:core, :bulk, { index: klass.table_name, body: ids.map { |id| { delete: { _id: id } } }, refresh: false }, "#{klass} Pit Delete")
  end

  # refresh index
  klass.connection.refresh_table(klass.table_name) if refresh

  # return total count
  delete_count
end

#pit_results(keep_alive: '1m', batch_size: 1000, ids_only: false) ⇒ Integer, Array Also known as: total_results

executes the current query in a +point_in_time+ scope. this will provide the possibility to resolve more than the +max_result_window+ (default: 10000) hits. resolves results (hits->hits) from the search but uses the pit query instead to resolve more than 10000 entries.

If a block was provided it'll yield the results array per batch size.

Parameters:

  • keep_alive (String) (defaults to: '1m')
    • how long to keep alive (for each single request) - default: '1m'
  • batch_size (Integer) (defaults to: 1000)
    • how many results per query (default: 1000 - this means at least 10 queries before reaching the +max_result_window+)
  • ids_only (Boolean) (defaults to: false)
    • resolve ids only from results

Returns:

  • (Integer, Array)

    either returns the results-array (no block provided) or the total amount of results

Raises:

  • (ArgumentError)


95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 95

def pit_results(keep_alive: '1m', batch_size: 1000, ids_only: false)
  raise(ArgumentError, "Batch size cannot be above the 'max_result_window' (#{klass.max_result_window}) !") if batch_size > klass.max_result_window

  # check if limit or offset values where provided
  results_limit  = limit_value ? limit_value : Float::INFINITY
  results_offset = offset_value ? offset_value : 0

  # search_after requires a order - we resolve a order either from provided value or by default ...
  relation = ordered_relation

  # FALLBACK (without any order) for restricted access to the '_id' field.
  # with PIT a order by '_shard_doc' can also be used
  # see @ https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html
  relation.order!(_shard_doc: :asc) if relation.order_values.empty? && klass.connection.access_shard_doc?

  # resolve ids only
  relation.reselect!('_id') if ids_only

  # clear limit & offset
  relation.offset!(nil).limit!(nil)

  # remove the 'index' from the query arguments (pit doesn't like that)
  relation.configure!(:__query__, { index: nil })

  # we store the results in this array
  results       = []
  results_total = 0

  # resolve a new pit and auto-close after we finished
  point_in_time(keep_alive: keep_alive) do |pit_id|
    current_pit_hash = { pit: { id: pit_id, keep_alive: keep_alive } }

    # resolve new data until we got all we need
    loop do
      # change pit settings & limit (spawn is required, since a +resolve+ will make the relation immutable)
      current_response = relation.spawn.configure!(current_pit_hash).limit!(batch_size).resolve('Pit Results').response

      # resolve only data from hits->hits[{_source}]
      current_results        = if ids_only
                                 current_response['hits']['hits'].map { |result| result['_id'] }
                               else
                                 current_response['hits']['hits'].map { |result| result['_source'].merge('_id' => result['_id']) }
                               end

      current_results_length = current_results.length

      # check if we reached the required offset
      if results_offset < current_results_length
        # check for parts
        # (maybe a offset 6300 was provided but the batch size is 1000 - so we need to skip a part ...)
        results_from = results_offset > 0 ? results_offset : 0
        results_to   = (results_total + current_results_length - results_from) > results_limit ? results_limit - results_total + results_from - 1 : -1

        ranged_results = current_results[results_from..results_to]

        if block_given?
          yield ranged_results
        else
          results += ranged_results
        end

        # add to total
        results_total += ranged_results.length
      end

      # -------- BREAK conditions --------

      # we reached our maximum value
      break if results_total >= results_limit

      # we ran out of data
      break if current_results_length < batch_size

      # additional security - prevents infinite loops
      raise(::ActiveRecord::StatementInvalid, "'pit_results' aborted due an infinite loop error (invalid or missing order)") if current_pit_hash[:search_after] == current_response['hits']['hits'][-1]['sort'] && current_pit_hash[:pit][:id] == current_response['pit_id']

      # -------- NEXT LOOP changes --------

      # reduce the offset
      results_offset -= current_results_length

      # assign new pit
      current_pit_hash = { search_after: current_response['hits']['hits'][-1]['sort'], pit: { id: current_response['pit_id'], keep_alive: keep_alive } }

      # we need to justify the +batch_size+ if the query will reach over the limit
      batch_size       = results_limit - results_total if results_offset < batch_size && (results_total + batch_size) > results_limit
    end
  end

  # return results array or total value
  if block_given?
    results_total
  else
    results
  end
end

#point_in_time(keep_alive: '1m') {|initial_pit_id| ... } ⇒ nil, String Also known as: pit

creates and returns a new point in time id. optionally yields the provided block and closes the pit afterwards.

Parameters:

  • keep_alive (String) (defaults to: '1m')

    (default: '1m')

Yields:

  • (initial_pit_id)

Returns:

  • (nil, String)
    • either returns the pit_id (no block given) or nil


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 67

def point_in_time(keep_alive: '1m')
  # resolve a initial PIT id
  initial_pit_id = klass.connection.api(:core, :open_point_in_time, { index: klass.table_name, keep_alive: keep_alive }, "#{klass} Open Pit").dig('id')

  return initial_pit_id unless block_given?

  # block provided, so yield with id
  yield initial_pit_id

  # close PIT
  klass.connection.api(:core, :close_point_in_time, { body: { id: initial_pit_id } }, "#{klass} Close Pit")

  # return nil if everything was ok
  nil
end

#responseArray

returns the RAW response for the current query

Returns:

  • (Array)


218
219
220
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 218

def response
  spawn.hits_only!.resolve('Response').response
end

#resultsArray

returns the results for the current query

Returns:

  • (Array)


242
243
244
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 242

def results
  spawn.hits_only!.resolve('Results').results
end

#totalObject

returns the total value



247
248
249
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 247

def total
  loaded? ? @total : spawn.total_only!.resolve('Total').total
end

#total_only!Object

sets query as "total"-only query (drops the size, sort & aggs options - so no hits & aggs will be returned)



266
267
268
269
270
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 266

def total_only!
  configure!({ size: 0, from: nil, aggs: nil, sort: nil, _source: false })

  self
end