Class: Valkyrie::Indexing::RedisQueue::IndexingAdapter
- Inherits:
-
Object
- Object
- Valkyrie::Indexing::RedisQueue::IndexingAdapter
- Defined in:
- lib/valkyrie/indexing/redis_queue/indexing_adapter.rb
Instance Attribute Summary collapse
- #connection ⇒ RSolr::Client
-
#delete_error_name ⇒ Object
Returns the value of attribute delete_error_name.
-
#delete_queue_name ⇒ Object
Returns the value of attribute delete_queue_name.
-
#index_error_name ⇒ Object
Returns the value of attribute index_error_name.
-
#index_queue_name ⇒ Object
Returns the value of attribute index_queue_name.
Instance Method Summary collapse
-
#delete(resource:) ⇒ Array<Valkyrie::Resource>
Deletes a Solr Document using the ID.
-
#delete_error_queue(size: 200) ⇒ Object
If a batch fails, try running them one at a time to get down to just records that really fail.
-
#delete_queue(size: 200) ⇒ Object
We reach in to solr directly here to prevent needing to load the objects unnecessarily.
-
#index_error_queue(size: 200) ⇒ Object
If a batch fails, try running them one at a time to get down to just records that really fail.
- #index_queue(size: 200) ⇒ Object
-
#initialize(connection: nil, index_queue_name: 'toindex', delete_queue_name: 'todelete') ⇒ IndexingAdapter
constructor
A new instance of IndexingAdapter.
- #list_delete ⇒ Object
- #list_delete_errors ⇒ Object
- #list_index ⇒ Object
- #list_index_errors ⇒ Object
- #reset! ⇒ Object
- #save(resource:) ⇒ Object
- #save_all(resources:) ⇒ Object
-
#wipe! ⇒ Object
Delete the Solr index of all Documents.
Constructor Details
#initialize(connection: nil, index_queue_name: 'toindex', delete_queue_name: 'todelete') ⇒ IndexingAdapter
Returns a new instance of IndexingAdapter.
14 15 16 17 18 19 20 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 14 def initialize(connection: nil, index_queue_name: 'toindex', delete_queue_name: 'todelete') @connection = connection @index_queue_name = index_queue_name @delete_queue_name = delete_queue_name @index_error_name = index_queue_name + "-error" @delete_error_name = delete_queue_name + "-error" end |
Instance Attribute Details
#connection ⇒ RSolr::Client
9 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 9 attr_writer :connection |
#delete_error_name ⇒ Object
Returns the value of attribute delete_error_name.
10 11 12 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 10 def delete_error_name @delete_error_name end |
#delete_queue_name ⇒ Object
Returns the value of attribute delete_queue_name.
10 11 12 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 10 def delete_queue_name @delete_queue_name end |
#index_error_name ⇒ Object
Returns the value of attribute index_error_name.
10 11 12 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 10 def index_error_name @index_error_name end |
#index_queue_name ⇒ Object
Returns the value of attribute index_queue_name.
10 11 12 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 10 def index_queue_name @index_queue_name end |
Instance Method Details
#delete(resource:) ⇒ Array<Valkyrie::Resource>
Deletes a Solr Document using the ID
36 37 38 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 36 def delete(resource:) connection.zadd(delete_queue_name, Time.current.to_i, resource.id.to_s) end |
#delete_error_queue(size: 200) ⇒ Object
If a batch fails, try running them one at a time to get down to just records that really fail
100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 100 def delete_error_queue(size: 200) @set = [] size.times do @set = connection.zpopmin(delete_error_name, 1) return [] if @set.blank? solr_indexer = Valkyrie::IndexingAdapter.find(:solr_index) solr_indexer.connection.delete_by_id @set[0].to_s, { softCommit: true } solr_indexer.connection.commit rescue # if anything goes wrong, try to requeue the items @set.each { |id, _time| connection.zadd(delete_error_name, Time.now.to_i, id) } end end |
#delete_queue(size: 200) ⇒ Object
We reach in to solr directly here to prevent needing to load the objects unnecessarily
85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 85 def delete_queue(size: 200) set = connection.zpopmin(delete_queue_name, size) return [] if set.blank? solr_indexer = Valkyrie::IndexingAdapter.find(:solr_index) set.each do |id, _time| solr_indexer.connection.delete_by_id id.to_s, { softCommit: true } end solr_indexer.connection.commit rescue # if anything goes wrong, try to requeue the items set.each { |id, time| connection.zadd(delete_error_name, time, id) } raise end |
#index_error_queue(size: 200) ⇒ Object
If a batch fails, try running them one at a time to get down to just records that really fail
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 67 def index_error_queue(size: 200) @set = [] solr_indexer = Valkyrie::IndexingAdapter.find(:solr_index) size.times do @set = queue.connection.zpopmin(index_error_name, 1) return [] if @set.blank? # we have to load these one at a time because find_all_by_id gets duplicates during wings transition resource = Hyrax.query_service.find_by(id: @set[0]) solr_indexer.save(resource: resource) rescue # if anything goes wrong, try to requeue the items @set.each { |id, _time| queue.connection.zadd(index_error_name + "-twice", Time.now.to_i, id) } end solr_indexer.connection.commit end |
#index_queue(size: 200) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 52 def index_queue(size: 200) set = connection.zpopmin(index_queue_name, size) return [] if set.blank? # we have to load these one at a time because find_all_by_id gets duplicates during wings transition resources = set.map { |id, _time| Hyrax.query_service.find_by(id: id) } solr_indexer = Valkyrie::IndexingAdapter.find(:solr_index) solr_indexer.save_all(resources: resources) solr_indexer.connection.commit rescue # if anything goes wrong, try to requeue the items set.each { |id, time| connection.zadd(index_error_name, time, id) } raise end |
#list_delete ⇒ Object
118 119 120 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 118 def list_delete connection.zrange(delete_queue_name, 0, -1, with_scores: true) end |
#list_delete_errors ⇒ Object
126 127 128 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 126 def list_delete_errors connection.zrange(delete_error_name, 0, -1, with_scores: true) end |
#list_index ⇒ Object
114 115 116 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 114 def list_index connection.zrange(index_queue_name, 0, -1, with_scores: true) end |
#list_index_errors ⇒ Object
122 123 124 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 122 def list_index_errors connection.zrange(index_error_name, 0, -1, with_scores: true) end |
#reset! ⇒ Object
48 49 50 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 48 def reset! self.connection = default_connection end |
#save(resource:) ⇒ Object
26 27 28 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 26 def save(resource:) persist([resource]) end |
#save_all(resources:) ⇒ Object
30 31 32 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 30 def save_all(resources:) persist(resources) end |
#wipe! ⇒ Object
Delete the Solr index of all Documents
41 42 43 44 45 46 |
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 41 def wipe! connection.del(index_queue_name) connection.del(index_error_name) connection.del(delete_queue_name) connection.del(delete_error_name) end |