Class: Sunspot::IndexQueue
- Inherits:
-
Object
- Object
- Sunspot::IndexQueue
- Defined in:
- lib/sunspot/index_queue.rb,
lib/sunspot/index_queue/batch.rb,
lib/sunspot/index_queue/entry.rb,
lib/sunspot/index_queue/session_proxy.rb,
lib/sunspot/index_queue/entry/mongo_impl.rb,
lib/sunspot/index_queue/entry/redis_impl.rb,
lib/sunspot/index_queue/entry/data_mapper_impl.rb,
lib/sunspot/index_queue/entry/active_record_impl.rb
Overview
Implementation of an asynchronous queue for indexing records with Solr. Entries are added to the queue defining which records should be indexed or removed. The queue will then process those entries and send them to the Solr server in batches. This has two advantages over just updating in place. First, problems with Solr will not cause your application to stop functioning. Second, batching the commits to Solr is more efficient and it should be able to handle more throughput when you have a lot of records to index.
Defined Under Namespace
Modules: Entry Classes: Batch, SessionProxy, SolrNotResponding
Instance Attribute Summary collapse
-
#batch_size ⇒ Object
Returns the value of attribute batch_size.
-
#class_names ⇒ Object
readonly
Returns the value of attribute class_names.
-
#retry_interval ⇒ Object
Returns the value of attribute retry_interval.
-
#session ⇒ Object
readonly
Returns the value of attribute session.
Class Method Summary collapse
-
.default_priority ⇒ Object
Get the default indexing priority.
-
.set_priority(priority, &block) ⇒ Object
Set the default priority for indexing items within a block.
Instance Method Summary collapse
-
#batch_handler(&block) ⇒ Object
Provide a block that will handle submitting batches of records.
-
#error_count ⇒ Object
Get the number of entries that have errors in the queue.
-
#errors(options = {}) ⇒ Object
Get the entries in the queue that have errors.
-
#index(record_or_hash, options = {}) ⇒ Object
Add a record to be indexed to the queue.
-
#index_all(klass, ids, options = {}) ⇒ Object
Add a list of records to be indexed to the queue.
-
#initialize(options = {}) ⇒ IndexQueue
constructor
Create a new IndexQueue.
-
#process ⇒ Object
Process the queue.
-
#ready_count ⇒ Object
Get the number of entries in the queue that are ready to be processed.
-
#remove(record_or_hash, options = {}) ⇒ Object
Add a record to be removed to the queue.
-
#remove_all(klass, ids, options = {}) ⇒ Object
Add a list of records to be removed to the queue.
-
#reset! ⇒ Object
Reset all entries in the queue to clear errors and set them to be indexed immediately.
-
#total_count ⇒ Object
Get the number of entries to be processed in the queue.
Constructor Details
#initialize(options = {}) ⇒ IndexQueue
Create a new IndexQueue. Available options:
:retry_interval
- The number of seconds to wait between to retry indexing when an attempt fails (defaults to 1 minute). If an entry fails multiple times, it will be delayed for the interval times the number of failures. For example, if the interval is 1 minute and it has failed twice, the record won’t be attempted again for 2 minutes.
:batch_size
- The maximum number of records to try submitting to solr at one time (defaults to 100).
:class_names
- A list of class names that the queue will process. This can be used to have different queues process different classes of records when they need to different configurations.
:session
- The Sunspot::Session object to use for communicating with Solr (defaults to a session with the default config).
51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/sunspot/index_queue.rb', line 51 def initialize( = {}) @retry_interval = [:retry_interval] || 60 @batch_size = [:batch_size] || 100 @batch_handler = nil @class_names = [] if [:class_names].is_a?(Array) @class_names.concat([:class_names].collect{|name| name.to_s}) elsif [:class_names] @class_names << [:class_names].to_s end @session = [:session] || Sunspot::Session.new end |
Instance Attribute Details
#batch_size ⇒ Object
Returns the value of attribute batch_size.
17 18 19 |
# File 'lib/sunspot/index_queue.rb', line 17 def batch_size @batch_size end |
#class_names ⇒ Object (readonly)
Returns the value of attribute class_names.
18 19 20 |
# File 'lib/sunspot/index_queue.rb', line 18 def class_names @class_names end |
#retry_interval ⇒ Object
Returns the value of attribute retry_interval.
17 18 19 |
# File 'lib/sunspot/index_queue.rb', line 17 def retry_interval @retry_interval end |
#session ⇒ Object (readonly)
Returns the value of attribute session.
18 19 20 |
# File 'lib/sunspot/index_queue.rb', line 18 def session @session end |
Class Method Details
.default_priority ⇒ Object
Get the default indexing priority. Defaults to zero.
33 34 35 |
# File 'lib/sunspot/index_queue.rb', line 33 def default_priority Thread.current[:sunspot_index_queue_priority] || 0 end |
.set_priority(priority, &block) ⇒ Object
Set the default priority for indexing items within a block. Higher priority items will be processed first.
22 23 24 25 26 27 28 29 30 |
# File 'lib/sunspot/index_queue.rb', line 22 def set_priority(priority, &block) save_val = Thread.current[:sunspot_index_queue_priority] begin Thread.current[:sunspot_index_queue_priority] = priority.to_i yield ensure Thread.current[:sunspot_index_queue_priority] = save_val end end |
Instance Method Details
#batch_handler(&block) ⇒ Object
Provide a block that will handle submitting batches of records. The block will take a Batch object and must call submit!
on it. This can be useful for doing things such as providing an identity map for records in the batch. Example:
# Use the ActiveRecord identity map for each batch submitted to reduce database activity.
queue.batch_handler do
ActiveRecord::Base.cache do |batch|
batch.submit!
end
end
74 75 76 |
# File 'lib/sunspot/index_queue.rb', line 74 def batch_handler(&block) @batch_handler = block end |
#error_count ⇒ Object
Get the number of entries that have errors in the queue.
117 118 119 |
# File 'lib/sunspot/index_queue.rb', line 117 def error_count Entry.error_count(self) end |
#errors(options = {}) ⇒ Object
Get the entries in the queue that have errors. Supported options are :limit
(default 50) and :offset
(default 0).
122 123 124 125 |
# File 'lib/sunspot/index_queue.rb', line 122 def errors( = {}) limit = [:limit] ? [:limit].to_i : 50 Entry.errors(self, limit, [:offset].to_i) end |
#index(record_or_hash, options = {}) ⇒ Object
Add a record to be indexed to the queue. The record can be specified as either an indexable object or as as hash with :class and :id keys. The priority to be indexed can be passed in the options as :priority
(defaults to 0).
81 82 83 84 |
# File 'lib/sunspot/index_queue.rb', line 81 def index(record_or_hash, = {}) klass, id = class_and_id(record_or_hash) Entry.enqueue(self, klass, id, false, [:priority] || self.class.default_priority) end |
#index_all(klass, ids, options = {}) ⇒ Object
Add a list of records to be indexed to the queue. The priority to be indexed can be passed in the options as :priority
(defaults to 0).
96 97 98 |
# File 'lib/sunspot/index_queue.rb', line 96 def index_all(klass, ids, = {}) Entry.enqueue(self, klass, ids, false, [:priority] || self.class.default_priority) end |
#process ⇒ Object
Process the queue. Exits when there are no more entries to process at the current time. Returns the number of entries processed.
If any errors are encountered while processing the queue, they will be logged with the errors so they can be fixed and tried again later. However, if Solr is refusing connections, the processing is stopped right away and a Sunspot::IndexQueue::SolrNotResponding exception is raised.
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/sunspot/index_queue.rb', line 138 def process count = 0 loop do entries = Entry.next_batch!(self) if entries.nil? || entries.empty? break if Entry.ready_count(self) == 0 else batch = Batch.new(self, entries) if defined?(@batch_handler) && @batch_handler @batch_handler.call(batch) else batch.submit! end count += entries.select{|e| e.processed? }.size end end count end |
#ready_count ⇒ Object
Get the number of entries in the queue that are ready to be processed.
112 113 114 |
# File 'lib/sunspot/index_queue.rb', line 112 def ready_count Entry.ready_count(self) end |
#remove(record_or_hash, options = {}) ⇒ Object
Add a record to be removed to the queue. The record can be specified as either an indexable object or as as hash with :class and :id keys. The priority to be indexed can be passed in the options as :priority
(defaults to 0).
89 90 91 92 |
# File 'lib/sunspot/index_queue.rb', line 89 def remove(record_or_hash, = {}) klass, id = class_and_id(record_or_hash) Entry.enqueue(self, klass, id, true, [:priority] || self.class.default_priority) end |
#remove_all(klass, ids, options = {}) ⇒ Object
Add a list of records to be removed to the queue. The priority to be indexed can be passed in the options as :priority
(defaults to 0).
102 103 104 |
# File 'lib/sunspot/index_queue.rb', line 102 def remove_all(klass, ids, = {}) Entry.enqueue(self, klass, ids, true, [:priority] || self.class.default_priority) end |
#reset! ⇒ Object
Reset all entries in the queue to clear errors and set them to be indexed immediately.
128 129 130 |
# File 'lib/sunspot/index_queue.rb', line 128 def reset! Entry.reset!(self) end |
#total_count ⇒ Object
Get the number of entries to be processed in the queue.
107 108 109 |
# File 'lib/sunspot/index_queue.rb', line 107 def total_count Entry.total_count(self) end |