Class: Forklift::Connection::Elasticsearch
Instance Method Summary
collapse
#client, #exec, #exec!, #exec_ruby, #exec_script, #pipe
Constructor Details
#initialize(config, forklift) ⇒ Elasticsearch
Returns a new instance of Elasticsearch.
7
8
9
10
|
# File 'lib/forklift/transports/elasticsearch.rb', line 7
def initialize(config, forklift)
@config = config
@forklift = forklift
end
|
Instance Method Details
#config ⇒ Object
12
13
14
|
# File 'lib/forklift/transports/elasticsearch.rb', line 12
def config
@config
end
|
#connect ⇒ Object
20
21
22
|
# File 'lib/forklift/transports/elasticsearch.rb', line 20
def connect
@client = ::Elasticsearch::Client.new(config)
end
|
#delete_index(index) ⇒ Object
75
76
77
78
|
# File 'lib/forklift/transports/elasticsearch.rb', line 75
def delete_index(index)
forklift.logger.debug " ELASTICSEARCH (delete index): #{index}"
client.indices.delete({ index: index }) if client.indices.exists({ index: index })
end
|
#disconnect ⇒ Object
24
25
26
|
# File 'lib/forklift/transports/elasticsearch.rb', line 24
def disconnect
@client = nil
end
|
#forklift ⇒ Object
16
17
18
|
# File 'lib/forklift/transports/elasticsearch.rb', line 16
def forklift
@forklift
end
|
#read(index, query, looping = true, from = 0, size = forklift.config[:batch_size]) ⇒ Object
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
# File 'lib/forklift/transports/elasticsearch.rb', line 28
def read(index, query, looping=true, from=0, size=forklift.config[:batch_size])
offset = 0
loop_count = 0
while (looping == true || loop_count == 0)
data = []
prepared_query = query
prepared_query[:from] = from + offset
prepared_query[:size] = size
forklift.logger.debug " ELASTICSEARCH: #{query.to_json}"
results = client.search( { index: index, body: prepared_query } )
results["hits"]["hits"].each do |hit|
data << hit["_source"]
end
data.map{|l| l.symbolize_keys! }
if block_given?
yield data
else
return data
end
looping = false if results["hits"]["hits"].length == 0
offset = offset + size
loop_count = loop_count + 1
end
end
|
#write(data, index, update = false, type = 'forklift', primary_key = :id) ⇒ Object
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/forklift/transports/elasticsearch.rb', line 58
def write(data, index, update=false, type='forklift', primary_key=:id)
data.map{|l| l.symbolize_keys! }
data.each do |d|
object = {
index: index,
body: d,
type: type,
}
object[:id] = d[primary_key] if ( !d[primary_key].nil? && update == true )
forklift.logger.debug " ELASTICSEARCH (store): #{object.to_json}"
client.index object
end
client.indices.refresh({ index: index })
end
|