Class: AtlasEngine::Elasticsearch::Repository
- Inherits:
-
Object
- Object
- AtlasEngine::Elasticsearch::Repository
show all
- Extended by:
- T::Helpers, T::Sig
- Includes:
- RepositoryInterface
- Defined in:
- app/models/atlas_engine/elasticsearch/repository.rb
Constant Summary
collapse
- INITIAL_INDEX_VERSION =
0
AtlasEngine::Elasticsearch::RepositoryInterface::PostAddressData
Instance Attribute Summary collapse
Instance Method Summary
collapse
#active_alias, #archived_alias, #new_alias
Constructor Details
#initialize(index_base_name:, index_settings:, index_mappings:, mapper_callable: nil) ⇒ Repository
Returns a new instance of Repository.
36
37
38
39
40
41
42
43
44
45
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 36
def initialize(index_base_name:, index_settings:, index_mappings:, mapper_callable: nil)
@client = T.let(Client.new, ClientInterface)
@index_base_name = T.let(index_base_name, String)
@index_mappings = T.let(index_mappings || default_mapping, T::Hash[String, T.untyped])
@index_settings = T.let(index_settings, T::Hash[Symbol, T.untyped])
@mapper_callable = T.let(
mapper_callable || ->(record) { record.to_hash },
T.proc.params(arg0: T.untyped).returns(T.untyped),
)
end
|
Instance Attribute Details
#client ⇒ Object
Returns the value of attribute client.
14
15
16
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 14
def client
@client
end
|
#index_base_name ⇒ Object
Returns the value of attribute index_base_name.
17
18
19
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 17
def index_base_name
@index_base_name
end
|
#index_mappings ⇒ Object
Returns the value of attribute index_mappings.
20
21
22
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 20
def index_mappings
@index_mappings
end
|
#index_settings ⇒ Object
Returns the value of attribute index_settings.
23
24
25
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 23
def index_settings
@index_settings
end
|
#mapper_callable ⇒ Object
Returns the value of attribute mapper_callable.
26
27
28
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 26
def mapper_callable
@mapper_callable
end
|
Instance Method Details
#analyze(query) ⇒ Object
141
142
143
144
145
146
147
148
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 141
def analyze(query)
path = "/#{active_alias}/_analyze"
response = client.post(path, query, {})
response.body
rescue ::Elastic::Transport::Transport::Error => e
raise Elasticsearch::Error.new(e.message, e)
end
|
#base_alias_name ⇒ Object
48
49
50
51
52
53
54
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 48
def base_alias_name
if Rails.env.test?
"test_#{index_base_name.to_s.downcase}"
else
index_base_name.to_s.downcase
end
end
|
#create_next_index(ensure_clean: false, raise_errors: false) ⇒ Object
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 67
def create_next_index(ensure_clean: false, raise_errors: false)
return if client.index_or_alias_exists?(new_alias)
versioned_index_name = if client.index_or_alias_exists?(active_alias)
T.must(client.find_index_by(alias_name: active_alias)).next
else
"#{active_alias}.#{INITIAL_INDEX_VERSION}"
end
body = {
aliases: {
new_alias.to_s => {
is_write_index: true,
},
},
settings: index_settings,
mappings: index_mappings,
}
client.put(versioned_index_name, body)
rescue
raise if raise_errors
end
|
#find(id) ⇒ Object
161
162
163
164
165
166
167
168
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 161
def find(id)
path = "/#{active_alias}/_doc/#{id}"
response = client.get(path, nil, {})
response.body["_source"]
rescue ::Elastic::Transport::Transport::Error => e
raise Elasticsearch::Error.new(e.message, e)
end
|
#indices ⇒ Object
176
177
178
179
180
181
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 176
def indices
path = "/_cat/indices?format=json"
response = client.get(path, nil, {})
response.body
end
|
#read_alias_name ⇒ Object
57
58
59
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 57
def read_alias_name
base_alias_name
end
|
#record_source(post_address) ⇒ Object
171
172
173
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 171
def record_source(post_address)
mapper_callable.call(post_address).compact
end
|
#save_records_backfill(records) ⇒ Object
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 108
def save_records_backfill(records)
return if records.blank?
alias_name = if client.index_or_alias_exists?(new_alias)
new_alias
elsif client.index_or_alias_exists?(active_alias)
active_alias
else
raise "Next or current index must exist to backfill records"
end
body = ""
records.each do |record|
body += <<-NDJSON
{ "create": {} }
#{record_source(record).to_json}
NDJSON
end
client.post("/#{alias_name}/_bulk", body)
end
|
#search(query) ⇒ Object
131
132
133
134
135
136
137
138
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 131
def search(query)
path = "/#{active_alias}/_search"
response = client.post(path, query, {})
response.body
rescue ::Elastic::Transport::Transport::Error => e
raise Elasticsearch::Error.new(e.message, e)
end
|
#switch_to_next_index(raise_errors: false) ⇒ Object
97
98
99
100
101
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 97
def switch_to_next_index(raise_errors: false)
update_all_aliases_of_index if client.index_or_alias_exists?(new_alias)
rescue
raise if raise_errors
end
|
#term_vectors(query) ⇒ Object
151
152
153
154
155
156
157
158
|
# File 'app/models/atlas_engine/elasticsearch/repository.rb', line 151
def term_vectors(query)
path = "/#{active_alias}/_mtermvectors"
response = client.post(path, query, {})
response.body
rescue ::Elastic::Transport::Transport::Error => e
raise Elasticsearch::Error.new(e.message, e)
end
|