Class: AtlasEngine::Elasticsearch::Repository

Inherits:
Object
  • Object
show all
Extended by:
T::Helpers, T::Sig
Includes:
RepositoryInterface
Defined in:
app/models/atlas_engine/elasticsearch/repository.rb

Constant Summary collapse

INITIAL_INDEX_VERSION =
0

Constants included from RepositoryInterface

AtlasEngine::Elasticsearch::RepositoryInterface::PostAddressData

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from RepositoryInterface

#active_alias, #archived_alias, #new_alias

Constructor Details

#initialize(index_base_name:, index_settings:, index_mappings:, mapper_callable: nil) ⇒ Repository

Returns a new instance of Repository.



36
37
38
39
40
41
42
43
44
45
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 36

def initialize(index_base_name:, index_settings:, index_mappings:, mapper_callable: nil)
  @client = T.let(Client.new, ClientInterface)
  @index_base_name = T.let(index_base_name, String)
  @index_mappings = T.let(index_mappings || default_mapping, T::Hash[String, T.untyped])
  @index_settings = T.let(index_settings, T::Hash[Symbol, T.untyped])
  @mapper_callable = T.let(
    mapper_callable || ->(record) { record.to_hash },
    T.proc.params(arg0: T.untyped).returns(T.untyped),
  )
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



14
15
16
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 14

def client
  @client
end

#index_base_nameObject (readonly)

Returns the value of attribute index_base_name.



17
18
19
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 17

def index_base_name
  @index_base_name
end

#index_mappingsObject (readonly)

Returns the value of attribute index_mappings.



20
21
22
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 20

def index_mappings
  @index_mappings
end

#index_settingsObject (readonly)

Returns the value of attribute index_settings.



23
24
25
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 23

def index_settings
  @index_settings
end

#mapper_callableObject (readonly)

Returns the value of attribute mapper_callable.



26
27
28
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 26

def mapper_callable
  @mapper_callable
end

Instance Method Details

#analyze(query) ⇒ Object



141
142
143
144
145
146
147
148
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 141

def analyze(query)
  path = "/#{active_alias}/_analyze"
  response = client.post(path, query, {})

  response.body
rescue ::Elastic::Transport::Transport::Error => e
  raise Elasticsearch::Error.new(e.message, e)
end

#base_alias_nameObject



48
49
50
51
52
53
54
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 48

def base_alias_name
  if Rails.env.test?
    "test_#{index_base_name.to_s.downcase}"
  else
    index_base_name.to_s.downcase
  end
end

#create_next_index(ensure_clean: false, raise_errors: false) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 67

def create_next_index(ensure_clean: false, raise_errors: false)
  # PENDING: cleanup next index if ensure_clean = true
  return if client.index_or_alias_exists?(new_alias)

  versioned_index_name = if client.index_or_alias_exists?(active_alias)
    T.must(client.find_index_by(alias_name: active_alias)).next
  else
    "#{active_alias}.#{INITIAL_INDEX_VERSION}"
  end

  body = {
    aliases: {
      new_alias.to_s => {
        is_write_index: true,
      },
    },
    settings: index_settings,
    mappings: index_mappings,
  }

  client.put(versioned_index_name, body)
rescue
  raise if raise_errors
end

#find(id) ⇒ Object



161
162
163
164
165
166
167
168
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 161

def find(id)
  path = "/#{active_alias}/_doc/#{id}"
  response = client.get(path, nil, {})

  response.body["_source"]
rescue ::Elastic::Transport::Transport::Error => e
  raise Elasticsearch::Error.new(e.message, e)
end

#indicesObject



176
177
178
179
180
181
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 176

def indices
  path = "/_cat/indices?format=json"
  response = client.get(path, nil, {})

  response.body
end

#read_alias_nameObject



57
58
59
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 57

def read_alias_name
  base_alias_name
end

#record_source(post_address) ⇒ Object



171
172
173
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 171

def record_source(post_address)
  mapper_callable.call(post_address).compact
end

#save_records_backfill(records) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 108

def save_records_backfill(records)
  return if records.blank?

  alias_name = if client.index_or_alias_exists?(new_alias)
    new_alias
  elsif client.index_or_alias_exists?(active_alias)
    active_alias
  else
    raise "Next or current index must exist to backfill records"
  end

  body = ""
  records.each do |record|
    body += <<-NDJSON
      { "create": {} }
      #{record_source(record).to_json}
    NDJSON
  end

  client.post("/#{alias_name}/_bulk", body)
end

#search(query) ⇒ Object



131
132
133
134
135
136
137
138
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 131

def search(query)
  path = "/#{active_alias}/_search"
  response = client.post(path, query, {})

  response.body
rescue ::Elastic::Transport::Transport::Error => e
  raise Elasticsearch::Error.new(e.message, e)
end

#switch_to_next_index(raise_errors: false) ⇒ Object



97
98
99
100
101
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 97

def switch_to_next_index(raise_errors: false)
  update_all_aliases_of_index if client.index_or_alias_exists?(new_alias)
rescue
  raise if raise_errors
end

#term_vectors(query) ⇒ Object



151
152
153
154
155
156
157
158
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 151

def term_vectors(query)
  path = "/#{active_alias}/_mtermvectors"
  response = client.post(path, query, {})

  response.body
rescue ::Elastic::Transport::Transport::Error => e
  raise Elasticsearch::Error.new(e.message, e)
end