Class: Valkyrie::Indexing::RedisQueue::IndexingAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/valkyrie/indexing/redis_queue/indexing_adapter.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection: nil, index_queue_name: 'toindex', delete_queue_name: 'todelete') ⇒ IndexingAdapter

Returns a new instance of IndexingAdapter.

Parameters:

  • connection (RSolr::Client) (defaults to: nil)

    The RSolr connection to index to.



14
15
16
17
18
19
20
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 14

def initialize(connection: nil, index_queue_name: 'toindex', delete_queue_name: 'todelete')
  @connection = connection
  @index_queue_name = index_queue_name
  @delete_queue_name = delete_queue_name
  @index_error_name = index_queue_name + "-error"
  @delete_error_name = delete_queue_name + "-error"
end

Instance Attribute Details

#connectionRSolr::Client

Returns:

  • (RSolr::Client)


9
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 9

attr_writer :connection

#delete_error_nameObject

Returns the value of attribute delete_error_name.



10
11
12
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 10

def delete_error_name
  @delete_error_name
end

#delete_queue_nameObject

Returns the value of attribute delete_queue_name.



10
11
12
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 10

def delete_queue_name
  @delete_queue_name
end

#index_error_nameObject

Returns the value of attribute index_error_name.



10
11
12
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 10

def index_error_name
  @index_error_name
end

#index_queue_nameObject

Returns the value of attribute index_queue_name.



10
11
12
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 10

def index_queue_name
  @index_queue_name
end

Instance Method Details

#delete(resource:) ⇒ Array<Valkyrie::Resource>

Deletes a Solr Document using the ID

Returns:

  • (Array<Valkyrie::Resource>)

    resources which have been deleted from Solr



36
37
38
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 36

def delete(resource:)
  connection.zadd(delete_queue_name, Time.current.to_i, resource.id.to_s)
end

#delete_error_queue(size: 200) ⇒ Object

If a batch fails, try running them one at a time to get down to just records that really fail



100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 100

def delete_error_queue(size: 200)
  @set = []
  size.times do
    @set = connection.zpopmin(delete_error_name, 1)
    return [] if @set.blank?
    solr_indexer = Valkyrie::IndexingAdapter.find(:solr_index)
    solr_indexer.connection.delete_by_id @set[0].to_s, { softCommit: true }
    solr_indexer.connection.commit
  rescue
    # if anything goes wrong, try to requeue the items
    @set.each { |id, _time| connection.zadd(delete_error_name, Time.now.to_i, id) }
  end
end

#delete_queue(size: 200) ⇒ Object

We reach in to solr directly here to prevent needing to load the objects unnecessarily



85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 85

def delete_queue(size: 200)
  set = connection.zpopmin(delete_queue_name, size)
  return [] if set.blank?
  solr_indexer = Valkyrie::IndexingAdapter.find(:solr_index)
  set.each do |id, _time|
    solr_indexer.connection.delete_by_id id.to_s, { softCommit: true }
  end
  solr_indexer.connection.commit
rescue
  # if anything goes wrong, try to requeue the items
  set.each { |id, time| connection.zadd(delete_error_name, time, id) }
  raise
end

#index_error_queue(size: 200) ⇒ Object

If a batch fails, try running them one at a time to get down to just records that really fail



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 67

def index_error_queue(size: 200)
  @set = []
  solr_indexer = Valkyrie::IndexingAdapter.find(:solr_index)

  size.times do
    @set = queue.connection.zpopmin(index_error_name, 1)
    return [] if @set.blank?
    # we have to load these one at a time because find_all_by_id gets duplicates during wings transition
    resource = Hyrax.query_service.find_by(id: @set[0])
    solr_indexer.save(resource: resource)
  rescue
    # if anything goes wrong, try to requeue the items
    @set.each { |id, _time| queue.connection.zadd(index_error_name + "-twice", Time.now.to_i, id) }
  end
  solr_indexer.connection.commit
end

#index_queue(size: 200) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 52

def index_queue(size: 200)
  set = connection.zpopmin(index_queue_name, size)
  return [] if set.blank?
  # we have to load these one at a time because find_all_by_id gets duplicates during wings transition
  resources = set.map { |id, _time| Hyrax.query_service.find_by(id: id) }
  solr_indexer = Valkyrie::IndexingAdapter.find(:solr_index)
  solr_indexer.save_all(resources: resources)
  solr_indexer.connection.commit
rescue
  # if anything goes wrong, try to requeue the items
  set.each { |id, time| connection.zadd(index_error_name, time, id) }
  raise
end

#list_deleteObject



118
119
120
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 118

def list_delete
  connection.zrange(delete_queue_name, 0, -1, with_scores: true)
end

#list_delete_errorsObject



126
127
128
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 126

def list_delete_errors
  connection.zrange(delete_error_name, 0, -1, with_scores: true)
end

#list_indexObject



114
115
116
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 114

def list_index
  connection.zrange(index_queue_name, 0, -1, with_scores: true)
end

#list_index_errorsObject



122
123
124
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 122

def list_index_errors
  connection.zrange(index_error_name, 0, -1, with_scores: true)
end

#reset!Object



48
49
50
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 48

def reset!
  self.connection = default_connection
end

#save(resource:) ⇒ Object



26
27
28
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 26

def save(resource:)
  persist([resource])
end

#save_all(resources:) ⇒ Object



30
31
32
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 30

def save_all(resources:)
  persist(resources)
end

#wipe!Object

Delete the Solr index of all Documents



41
42
43
44
45
46
# File 'lib/valkyrie/indexing/redis_queue/indexing_adapter.rb', line 41

def wipe!
  connection.del(index_queue_name)
  connection.del(index_error_name)
  connection.del(delete_queue_name)
  connection.del(delete_error_name)
end