Module: ElasticsearchRecord::Relation::ResultMethods

Defined in:
lib/elasticsearch_record/relation/result_methods.rb

Instance Method Summary collapse

Instance Method Details

#agg_pluck(*column_names) ⇒ Hash

aggregate pluck provided columns. returns a hash of values for each provided column

Examples:

Person.agg_pluck(:name)
#> {"name" => ['David', 'Jeremy', 'Jose']}

Person.agg_pluck(:id, :name)
#> {"id" => ['11', '2', '5'], "name" => ['David', 'Jeremy', 'Jose']}

Parameters:

  • column_names (Array)

Returns:

  • (Hash)


16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 16

def agg_pluck(*column_names)
  scope = self.spawn

  column_names.each do |column_name|
    scope.aggregate!(column_name, { terms: { field: column_name, size: limit_value || 10 } })
  end

  scope.aggregations.reduce({}) { |m, (k, v)|
    m[k.to_s] = v[:buckets].map { |bucket| bucket[:key] }
    m
  }
end

#aggregationsHash

returns the RAW aggregations for the current query

Returns:

  • (Hash)


183
184
185
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 183

def aggregations
  spawn.aggs_only!.resolve('Aggregations').aggregations
end

#aggs_only!Object

sets query as "aggs"-only query (drops the size & sort options - so no hits will return)



218
219
220
221
222
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 218

def aggs_only!
  configure!({ size: 0, from: nil, sort: nil, _source: false })

  self
end

#bucketsActiveSupport::HashWithIndifferentAccess, Hash

returns the response aggregations and resolve the buckets as key->value hash.

Returns:

  • (ActiveSupport::HashWithIndifferentAccess, Hash)


189
190
191
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 189

def buckets
  spawn.aggs_only!.resolve('Buckets').buckets
end

#composite(*column_names) ⇒ Hash

A multi-bucket aggregation that creates composite buckets from different sources. PLEASE NOTE: The composite aggregation is expensive. Load test your application before deploying a composite aggregation in production!

For a single column_name a hash with the distinct key and the +doc_count+ as value is returned. For multiple column_names a hash with the distinct keys (as hash) and the +doc_count+ as value is returned.

Examples:

Person.composite(:name)
#> {"David" => 10, "Jeremy" => 1, "Jose" => 24}

Person.composite(:name, :age)
#> {
    {name: "David", age: "16"} => 3,
    {name: "David", age: "18"} => 6,
    {name: "David", age: "20"} => 1,
    {name: "Jeremy", age: "20"} => 1,
    {name: "Jose", age: "6"} => 2,
    ...
 }

Parameters:

  • column_names (Array)

Returns:

  • (Hash)


51
52
53
54
55
56
57
58
59
60
61
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 51

def composite(*column_names)
  scope = self.spawn
  scope.aggregate!(:composite_bucket, { composite: { size: limit_value || 10, sources: column_names.map { |column_name| { column_name => { terms: { field: column_name } } } } } })

  if column_names.size == 1
    column_name = column_names[0]
    scope.aggregations[:composite_bucket][:buckets].reduce({}) { |m, bucket| m[bucket[:key][column_name]] = bucket[:doc_count]; m }
  else
    scope.aggregations[:composite_bucket][:buckets].reduce({}) { |m, bucket| m[bucket[:key]] = bucket[:doc_count]; m }
  end
end

#hitsArray

returns the RAW hits for the current query

Returns:

  • (Array)


195
196
197
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 195

def hits
  spawn.hits_only!.resolve('Hits').hits
end

#hits_only!Object

sets query as "hits"-only query (drops the aggs from the query)



211
212
213
214
215
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 211

def hits_only!
  configure!({ aggs: nil })

  self
end

#pit_results(keep_alive: '1m', batch_size: 1000) ⇒ Object Also known as: total_results

executes the current query in a +point_in_time+ scope. this will provide the possibility to resolve more than the +max_result_window+ (default: 10000) hits. resolves results (hits->hits) from the search but uses the pit query instead to resolve more than 10000 entries.

If a block was provided it'll yield the results array per batch size.

Parameters:

  • keep_alive (String) (defaults to: '1m')
    • how long to keep alive (for each single request) - default: '1m'
  • batch_size (Integer) (defaults to: 1000)
    • how many results per query (default: 1000 - this means at least 10 queries before reaching the +max_result_window+)

Raises:

  • (ArgumentError)


93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 93

def pit_results(keep_alive: '1m', batch_size: 1000)
  raise ArgumentError, "Batch size cannot be above the 'max_result_window' (#{klass.max_result_window}) !" if batch_size > klass.max_result_window

  # check if a limit or offset values was provided
  results_limit  = limit_value ? limit_value : Float::INFINITY
  results_offset = offset_value ? offset_value : 0

  # search_after requires a order - we resolve a order either from provided value or by default ...
  relation = ordered_relation

  # clear limit & offset
  relation.offset!(nil).limit!(nil)

  # remove the 'index' from the query arguments (pit doesn't like that)
  relation.configure!(:__claim__, { index: nil })

  # we store the results in this array
  results       = []
  results_total = 0

  # resolve a new pit and auto-close after we finished
  point_in_time(keep_alive: keep_alive) do |pit_id|
    current_pit_hash = { pit: { id: pit_id, keep_alive: keep_alive } }

    # resolve new data until we got all we need
    loop do
      # change pit settings & limit (spawn is required, since a +resolve+ will make the relation immutable)
      current_response = relation.spawn.configure!(current_pit_hash).limit!(batch_size).resolve('Pit').response

      # resolve only data from hits->hits[{_source}]
      current_results        = current_response['hits']['hits'].map { |result| result['_source'].merge('_id' => result['_id']) }
      current_results_length = current_results.length

      # check if we reached the required offset
      if results_offset < current_results_length
        # check for parts
        # (maybe a offset 6300 was provided but the batch size is 1000 - so we need to skip a part ...)
        results_from = results_offset > 0 ? results_offset : 0
        results_to   = (results_total + current_results_length - results_from) > results_limit ? results_limit - results_total + results_from - 1 : -1

        ranged_results = current_results[results_from..results_to]

        if block_given?
          yield ranged_results
        else
          results |= ranged_results
        end

        # add to total
        results_total += ranged_results.length
      end

      # -------- BREAK conditions --------

      # we reached our maximum value
      break if results_total >= results_limit

      # we ran out of data
      break if current_results_length < batch_size

      # additional security - required?
      # break if current_pit_hash[:search_after] == current_response['hits']['hits'][-1]['sort']

      # -------- NEXT LOOP changes --------

      # reduce the offset
      results_offset -= current_results_length

      # assign new pit
      current_pit_hash = { search_after: current_response['hits']['hits'][-1]['sort'], pit: { id: current_response['pit_id'], keep_alive: keep_alive } }

      # we need to justify the +batch_size+ if the query will reach over the limit
      batch_size       = results_limit - results_total if results_offset < batch_size && (results_total + batch_size) > results_limit
    end
  end

  # return results array
  results
end

#point_in_time(keep_alive: '1m') {|initial_pit_id| ... } ⇒ nil, String Also known as: pit

creates and returns a new point in time id. optionally yields the provided block and closes the pit afterwards.

Parameters:

  • keep_alive (String) (defaults to: '1m')

    (default: '1m')

Yields:

  • (initial_pit_id)

Returns:

  • (nil, String)
    • either returns the pit_id (no block given) or nil


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 67

def point_in_time(keep_alive: '1m')
  # resolve a initial PIT id
  initial_pit_id = klass.connection.api(:core, :open_point_in_time, { index: klass.table_name, keep_alive: keep_alive }, "#{klass} Open Pit").dig('id')

  return initial_pit_id unless block_given?

  # block provided, so yield with id
  yield initial_pit_id

  # close PIT
  klass.connection.api(:core, :close_point_in_time, { body: { id: initial_pit_id } }, "#{klass} Close Pit")

  # return nil if everything was ok
  nil
end

#responseArray

returns the RAW response for the current query

Returns:

  • (Array)


177
178
179
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 177

def response
  spawn.hits_only!.resolve('Response').response
end

#resultsArray

returns the results for the current query

Returns:

  • (Array)


201
202
203
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 201

def results
  spawn.hits_only!.resolve('Results').results
end

#totalObject

returns the total value



206
207
208
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 206

def total
  loaded? ? @total : spawn.total_only!.resolve('Total').total
end

#total_only!Object

sets query as "total"-only query (drops the size, sort & aggs options - so no hits & aggs will be returned)



225
226
227
228
229
# File 'lib/elasticsearch_record/relation/result_methods.rb', line 225

def total_only!
  configure!({ size: 0, from: nil, aggs: nil, sort: nil, _source: false })

  self
end