Class: RotatingEsLoader
- Inherits:
-
EsClient
show all
- Extended by:
- Memoist
- Defined in:
- lib/rotating_es_loader.rb
Overview
Constant Summary
collapse
- MAX_INDEX_AGE =
indexs with a datestamp newer than this age will not be wiped
3
- DEFAULT_SLICE_SIZE =
50
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods inherited from EsClient
#client, #method_missing
Constructor Details
Returns a new instance of RotatingEsLoader.
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/rotating_es_loader.rb', line 15
def initialize(opts)
raise('no credentials provided') unless opts[:credentials]
raise('no url provided') unless opts[:url]
raise('no definitions provided') unless opts[:index_definitions].is_a?(Hash)
uri = URI.parse(opts[:url])
super(
url: opts[:url],
credentials: opts[:credentials]
)
@index_definitions = opts[:index_definitions]
@slice_size = opts[:slice_size] || DEFAULT_SLICE_SIZE
@logger.debug("index keys: #{index_keys}")
@datasources = opts[:datasources]
index_keys.each do |key|
raise("No datasource for #{key}") unless @index_definitions[key][:datasource].respond_to?(:each)
end
es_info = client.info
@es_major_version = es_info['version']['number'].split('.').first.to_i
end
|
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
in the class EsClient
Instance Attribute Details
#es_major_version ⇒ Object
Returns the value of attribute es_major_version.
13
14
15
|
# File 'lib/rotating_es_loader.rb', line 13
def es_major_version
@es_major_version
end
|
#slice_size ⇒ Object
Returns the value of attribute slice_size.
13
14
15
|
# File 'lib/rotating_es_loader.rb', line 13
def slice_size
@slice_size
end
|
Instance Method Details
#create_documents ⇒ Object
73
74
75
76
77
78
79
80
81
|
# File 'lib/rotating_es_loader.rb', line 73
def create_documents
index_keys.each do |k|
create_documents_for_type(
name: get_index_name(k),
data: datasource_for(k),
type: document_type_for(k)
)
end
end
|
#create_documents_for_type(name:, data:, type: nil) ⇒ Object
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
# File 'lib/rotating_es_loader.rb', line 83
def create_documents_for_type(name:, data:, type: nil)
@logger.info("Creating documents of in index #{name} in batches of #{@slice_size}")
data.lazy.each_slice(@slice_size).each_with_index do |slice, slice_num|
@logger.debug("batch #{slice_num}: #{slice.size} docs")
result = client.bulk(
body: slice.flat_map do |rec|
index_record = { index: { _index: name, _id: rec[:id] } }
index_record[:index].merge!(_type: type) if es_major_version == 5
[
index_record,
rec
]
end
)
@logger.warn("ERRORS: #{JSON.pretty_generate(result)}") if result['errors']
end
end
|
#create_index(name:, key:) ⇒ Object
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
|
# File 'lib/rotating_es_loader.rb', line 188
def create_index(name:, key:)
@logger.debug("creating index #{name}")
mappings = mappings_adjusted_for_es_version(key)
@logger.debug("mappings: #{mappings.to_json}")
@logger.debug("creating index #{name}")
client.indices.create({
index: name,
body: {
settings: settings_for(key),
mappings: mappings
}
}.tap { |x| puts JSON.pretty_generate(x) })
end
|
#create_indices ⇒ Object
103
104
105
106
107
|
# File 'lib/rotating_es_loader.rb', line 103
def create_indices
index_keys.each do |k|
create_index(name: get_index_name(k), key: k)
end
end
|
#datasource_for(key) ⇒ Object
58
59
60
|
# File 'lib/rotating_es_loader.rb', line 58
def datasource_for(key)
@index_definitions[key][:datasource]
end
|
#delete_old_indices ⇒ Object
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
# File 'lib/rotating_es_loader.rb', line 126
def delete_old_indices
existing_indices = client.indices.get(index: '_all')
@logger.debug("Existing indexes: #{existing_indices.keys}")
index_keys.each do |index|
keys = existing_indices.keys.select { |k| k.include?(index.to_s) }.sort
keys_by_date = keys.group_by { |k| key_age(k) }
keys_to_delete = []
keys_by_date.each do |age, key_list|
key_list.pop if age <= MAX_INDEX_AGE
keys_to_delete += key_list
end
unless keys_to_delete.empty?
@logger.debug("Deleting indexes #{keys_to_delete.join(', ')}")
client.indices.delete index: keys_to_delete
end
end
end
|
#document_type_for(key) ⇒ Object
40
41
42
43
44
|
# File 'lib/rotating_es_loader.rb', line 40
def document_type_for(key)
raise "document type not supported for ES #{es_major_version}" \
unless es_major_version <= 5
@index_definitions[key][:type]
end
|
#execute ⇒ Object
62
63
64
65
66
67
|
# File 'lib/rotating_es_loader.rb', line 62
def execute
create_indices
create_documents
swap_aliases
delete_old_indices
end
|
#get_index_name(key) ⇒ Object
118
119
120
121
122
123
|
# File 'lib/rotating_es_loader.rb', line 118
def get_index_name(key)
date_str = Date.today.to_s.gsub(/\D/, '') + '-' + Time.now.to_i.to_s + '-' + Process.pid.to_s
raise("provided key #{key} is not a valid index") unless index_keys.include?(key)
return key.to_s + '-' + date_str
end
|
#index_keys ⇒ Object
46
47
48
|
# File 'lib/rotating_es_loader.rb', line 46
def index_keys
@index_definitions.keys
end
|
#key_age(key) ⇒ Object
109
110
111
112
113
114
115
116
|
# File 'lib/rotating_es_loader.rb', line 109
def key_age(key)
date_str = key.split('-')[1]
if date_str && date_str.size == 8
(Date.today - Date.parse(date_str)).to_i
else
0
end
end
|
#mappings_adjusted_for_es_version(key) ⇒ Object
176
177
178
179
180
181
182
183
184
185
186
|
# File 'lib/rotating_es_loader.rb', line 176
def mappings_adjusted_for_es_version(key)
mapping_for_key = mappings_for(key) || @logger.warn("mappings does not contain a mapping for #{key}")
mappings = {}
if es_major_version < 6
mappings[key] = { properties: mapping_for_key }
else
mappings[:properties] = mapping_for_key
end
mappings
end
|
#mappings_for(key) ⇒ Object
50
51
52
|
# File 'lib/rotating_es_loader.rb', line 50
def mappings_for(key)
@index_definitions[key][:mappings]
end
|
#multitype_support? ⇒ Boolean
69
70
71
|
# File 'lib/rotating_es_loader.rb', line 69
def multitype_support?
return es_major_version <= 5
end
|
#settings_for(key) ⇒ Object
54
55
56
|
# File 'lib/rotating_es_loader.rb', line 54
def settings_for(key)
@index_definitions[key][:settings]
end
|
#swap_aliases ⇒ Object
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
# File 'lib/rotating_es_loader.rb', line 149
def swap_aliases
index_keys.each do |alias_name|
index_name = get_index_name(alias_name)
actions = [
{ add: { index: index_name, alias: alias_name } }
]
@logger.debug("fetching any indices attached to alias #{alias_name}")
begin
client.indices.get_alias(name: alias_name).keys.each do |index_to_remove|
actions.unshift(
remove: { index: index_to_remove, alias: alias_name }
)
end
rescue StandardError => e
@logger.warn(e)
end
@logger.debug('update_aliases actions: ' + actions.to_json)
client.indices.update_aliases body: {
actions: actions
}
end
end
|