Class: Polipus::Storage::ElasticSearchStore

Inherits:
Base
  • Object
show all
Defined in:
lib/polipus-elasticsearch/storage/elasticsearch_store.rb

Constant Summary collapse

BINARY_FIELDS =
%w(body headers user_data)
DEFAULT_INDEX =
Polipus::ElasticSearch::Page

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, options = {}) ⇒ ElasticSearchStore

Returns a new instance of ElasticSearchStore.



15
16
17
18
19
20
21
22
23
24
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 15

def initialize(client, options = {})
  @index = options[:index] || options['index'] || DEFAULT_INDEX
  @index_name = options[:index_name] || options['index_name']
  @except = options[:except] || options['except'] || []
  @compress = options[:compress] || options['compress']
  @semaphore = Mutex.new
  @refresh = options[:refresh] || options['refresh'] || true
  index.setup(client, index_name)
  index.create_index!(index_name) unless index.index_exists?
end

Instance Attribute Details

#compressObject

Returns the value of attribute compress.



13
14
15
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 13

def compress
  @compress
end

#exceptObject

Returns the value of attribute except.



13
14
15
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 13

def except
  @except
end

#indexObject

Returns the value of attribute index.



13
14
15
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 13

def index
  @index
end

#index_nameObject

Returns the value of attribute index_name.



13
14
15
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 13

def index_name
  @index_name
end

#refreshObject

Returns the value of attribute refresh.



13
14
15
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 13

def refresh
  @refresh
end

#semaphoreObject

Returns the value of attribute semaphore.



13
14
15
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 13

def semaphore
  @semaphore
end

Instance Method Details

#add(page) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 26

def add(page)
  semaphore.synchronize do
    obj = page.to_hash
    Array(except).each { |field| obj.delete(field.to_s) }
    BINARY_FIELDS.each do |field|
      next if obj[field.to_s].nil? || obj[field.to_s].empty?
      obj[field.to_s] = MultiJson.encode(obj[field.to_s]) if field.to_s == 'user_data'
      obj[field.to_s] = Base64.encode64(obj[field.to_s])
    end
    obj['id'] = uuid(page)
    obj['fetched_at'] = obj['fetched_at'].to_i
    begin
      index.store(obj, refresh)
    rescue Elasticsearch::Transport::Transport::Errors::Conflict => ex
      # you're trying to store an old version.
    end
  end
end

#clearObject



45
46
47
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 45

def clear
  index.clear_index! if index.index_exists?
end

#countObject



49
50
51
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 49

def count
  index.count
end

#dropObject



53
54
55
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 53

def drop
  index.delete_index! if index.index_exists?
end

#eachObject



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 57

def each
  # This method is implemented only for testing purposes
  response = index.client.search(
    index: index_name,
    body: {
      query: { match_all: {} },
      from: 0,
      size: 25
    }
  )
  response['hits']['hits'].each do |data|
    page = load_page(data['_source'])
    yield uuid(page), page
  end
end

#exists?(page) ⇒ Boolean

Returns:

  • (Boolean)


73
74
75
76
77
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 73

def exists?(page)
  @semaphore.synchronize do
    index.exists?(uuid(page))
  end
end

#get(page) ⇒ Object



79
80
81
82
83
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 79

def get(page)
  @semaphore.synchronize do
    load_page(index.get(uuid(page)))
  end
end

#load_page(data) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 91

def load_page(data)
  return nil if data.nil?
  BINARY_FIELDS.each do |field|
    next if data[field.to_s].nil? || data[field.to_s].empty?
    data[field.to_s] = Base64.decode64(data[field.to_s])
    data[field.to_s] = MultiJson.decode(data[field.to_s]) if field.to_s == 'user_data'
  end
  page = Page.from_hash(data)
  page.fetched_at ||= 0
  page
end

#remove(page) ⇒ Object



85
86
87
88
89
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 85

def remove(page)
  @semaphore.synchronize do
    index.remove(uuid(page), refresh)
  end
end