Class: Polipus::Storage::ElasticSearchStore
- Inherits:
-
Base
- Object
- Base
- Polipus::Storage::ElasticSearchStore
- Defined in:
- lib/polipus-elasticsearch/storage/elasticsearch_store.rb
Constant Summary collapse
- BINARY_FIELDS =
%w(body headers user_data)
- DEFAULT_INDEX =
Polipus::ElasticSearch::Page
Instance Attribute Summary collapse
-
#compress ⇒ Object
Returns the value of attribute compress.
-
#except ⇒ Object
Returns the value of attribute except.
-
#index ⇒ Object
Returns the value of attribute index.
-
#index_name ⇒ Object
Returns the value of attribute index_name.
-
#refresh ⇒ Object
Returns the value of attribute refresh.
-
#semaphore ⇒ Object
Returns the value of attribute semaphore.
Instance Method Summary collapse
- #add(page) ⇒ Object
- #clear ⇒ Object
- #count ⇒ Object
- #drop ⇒ Object
- #each ⇒ Object
- #exists?(page) ⇒ Boolean
- #get(page) ⇒ Object
-
#initialize(client, options = {}) ⇒ ElasticSearchStore
constructor
A new instance of ElasticSearchStore.
- #load_page(data) ⇒ Object
- #remove(page) ⇒ Object
Constructor Details
#initialize(client, options = {}) ⇒ ElasticSearchStore
Returns a new instance of ElasticSearchStore.
15 16 17 18 19 20 21 22 23 24 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 15 def initialize(client, = {}) @index = [:index] || ['index'] || DEFAULT_INDEX @index_name = [:index_name] || ['index_name'] @except = [:except] || ['except'] || [] @compress = [:compress] || ['compress'] @semaphore = Mutex.new @refresh = [:refresh] || ['refresh'] || true index.setup(client, index_name) index.create_index!(index_name) unless index.index_exists? end |
Instance Attribute Details
#compress ⇒ Object
Returns the value of attribute compress.
13 14 15 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 13 def compress @compress end |
#except ⇒ Object
Returns the value of attribute except.
13 14 15 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 13 def except @except end |
#index ⇒ Object
Returns the value of attribute index.
13 14 15 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 13 def index @index end |
#index_name ⇒ Object
Returns the value of attribute index_name.
13 14 15 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 13 def index_name @index_name end |
#refresh ⇒ Object
Returns the value of attribute refresh.
13 14 15 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 13 def refresh @refresh end |
#semaphore ⇒ Object
Returns the value of attribute semaphore.
13 14 15 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 13 def semaphore @semaphore end |
Instance Method Details
#add(page) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 26 def add(page) semaphore.synchronize do obj = page.to_hash Array(except).each { |field| obj.delete(field.to_s) } BINARY_FIELDS.each do |field| next if obj[field.to_s].nil? || obj[field.to_s].empty? obj[field.to_s] = MultiJson.encode(obj[field.to_s]) if field.to_s == 'user_data' obj[field.to_s] = Base64.encode64(obj[field.to_s]) end obj['id'] = uuid(page) obj['fetched_at'] = obj['fetched_at'].to_i begin index.store(obj, refresh) rescue Elasticsearch::Transport::Transport::Errors::Conflict => ex # you're trying to store an old version. end end end |
#clear ⇒ Object
45 46 47 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 45 def clear index.clear_index! if index.index_exists? end |
#count ⇒ Object
49 50 51 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 49 def count index.count end |
#drop ⇒ Object
53 54 55 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 53 def drop index.delete_index! if index.index_exists? end |
#each ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 57 def each # This method is implemented only for testing purposes response = index.client.search( index: index_name, body: { query: { match_all: {} }, from: 0, size: 25 } ) response['hits']['hits'].each do |data| page = load_page(data['_source']) yield uuid(page), page end end |
#exists?(page) ⇒ Boolean
73 74 75 76 77 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 73 def exists?(page) @semaphore.synchronize do index.exists?(uuid(page)) end end |
#get(page) ⇒ Object
79 80 81 82 83 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 79 def get(page) @semaphore.synchronize do load_page(index.get(uuid(page))) end end |
#load_page(data) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 91 def load_page(data) return nil if data.nil? BINARY_FIELDS.each do |field| next if data[field.to_s].nil? || data[field.to_s].empty? data[field.to_s] = Base64.decode64(data[field.to_s]) data[field.to_s] = MultiJson.decode(data[field.to_s]) if field.to_s == 'user_data' end page = Page.from_hash(data) page.fetched_at ||= 0 page end |
#remove(page) ⇒ Object
85 86 87 88 89 |
# File 'lib/polipus-elasticsearch/storage/elasticsearch_store.rb', line 85 def remove(page) @semaphore.synchronize do index.remove(uuid(page), refresh) end end |