Class: Sunspot::IndexQueue::Entry::ActiveRecordImpl
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Sunspot::IndexQueue::Entry::ActiveRecordImpl
- Includes:
- Sunspot::IndexQueue::Entry
- Defined in:
- lib/sunspot/index_queue/entry/active_record_impl.rb
Overview
Implementation of an indexing queue backed by ActiveRecord.
To create the table, you should have a migration containing the following:
self.up
Sunspot::IndexQueue::Entry::ActiveRecordImpl.create_table
end
self.down
drop_table Sunspot::IndexQueue::Entry::ActiveRecordImpl.table_name
end
The default set up is to use an integer for the record_id
column type since it is the most efficient and works with most data models. If you need to use a string as the primary key, you can add additional statements to the migration to do so.
Instance Attribute Summary
Attributes included from Sunspot::IndexQueue::Entry
Class Method Summary collapse
-
.add(klass, id, delete, priority) ⇒ Object
Implementation of the add method.
-
.create_table ⇒ Object
Create the table used to store the queue in the database.
-
.delete_entries(entries) ⇒ Object
Implementation of the delete_entries method.
-
.error_count(queue) ⇒ Object
Implementation of the error_count method.
-
.errors(queue, limit, offset) ⇒ Object
Implementation of the errors method.
-
.next_batch!(queue) ⇒ Object
Implementation of the next_batch! method.
-
.ready_count(queue) ⇒ Object
Implementation of the ready_count method.
-
.reset!(queue) ⇒ Object
Implementation of the reset! method.
-
.total_count(queue) ⇒ Object
Implementation of the total_count method.
Instance Method Summary collapse
-
#reset! ⇒ Object
Implementation of the reset! method.
-
#set_error!(error, retry_interval = nil) ⇒ Object
Implementation of the set_error! method.
Methods included from Sunspot::IndexQueue::Entry
enqueue, implementation, implementation=, load_all_records, #processed?, #record
Class Method Details
.add(klass, id, delete, priority) ⇒ Object
Implementation of the add method.
85 86 87 88 89 90 91 92 |
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 85 def add(klass, id, delete, priority) queue_entry_key = {:record_id => id, :record_class_name => klass.name, :lock => nil} queue_entry = first(:conditions => queue_entry_key) || new(queue_entry_key.merge(:priority => priority)) queue_entry.is_delete = delete queue_entry.priority = priority if priority > queue_entry.priority queue_entry.run_at = Time.now.utc queue_entry.save! end |
.create_table ⇒ Object
Create the table used to store the queue in the database.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 100 def create_table connection.create_table table_name do |t| t.string :record_class_name, :null => false t.integer :record_id, :null => false t.boolean :is_delete, :null => false, :default => false t.datetime :run_at, :null => false t.integer :priority, :null => false, :default => 0 t.integer :lock, :null => true t.string :error, :null => true, :limit => 4000 t.integer :attempts, :null => false, :default => 0 end connection.add_index table_name, :record_id connection.add_index table_name, [:run_at, :record_class_name, :priority], :name => "#{table_name}_run_at" end |
.delete_entries(entries) ⇒ Object
Implementation of the delete_entries method.
95 96 97 |
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 95 def delete_entries(entries) delete_all(:id => entries) end |
.error_count(queue) ⇒ Object
Implementation of the error_count method.
44 45 46 47 48 49 50 51 |
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 44 def error_count(queue) conditions = ["#{connection.quote_column_name('error')} IS NOT NULL"] unless queue.class_names.empty? conditions.first << " AND #{connection.quote_column_name('record_class_name')} IN (?)" conditions << queue.class_names end count(:conditions => conditions) end |
.errors(queue, limit, offset) ⇒ Object
Implementation of the errors method.
54 55 56 57 58 59 60 61 |
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 54 def errors(queue, limit, offset) conditions = ["#{connection.quote_column_name('error')} IS NOT NULL"] unless queue.class_names.empty? conditions.first << " AND #{connection.quote_column_name('record_class_name')} IN (?)" conditions << queue.class_names end all(:conditions => conditions, :limit => limit, :offset => offset, :order => :id) end |
.next_batch!(queue) ⇒ Object
Implementation of the next_batch! method.
70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 70 def next_batch!(queue) conditions = ["#{connection.quote_column_name('run_at')} <= ?", Time.now.utc] unless queue.class_names.empty? conditions.first << " AND #{connection.quote_column_name('record_class_name')} IN (?)" conditions << queue.class_names end batch_entries = all(:select => "id", :conditions => conditions, :limit => queue.batch_size, :order => 'priority DESC, run_at') queue_entry_ids = batch_entries.collect{|entry| entry.id} return [] if queue_entry_ids.empty? lock = rand(0x7FFFFFFF) update_all({:run_at => queue.retry_interval.from_now.utc, :lock => lock, :error => nil}, :id => queue_entry_ids) all(:conditions => {:id => queue_entry_ids, :lock => lock}) end |
.ready_count(queue) ⇒ Object
Implementation of the ready_count method.
34 35 36 37 38 39 40 41 |
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 34 def ready_count(queue) conditions = ["#{connection.quote_column_name('run_at')} <= ?", Time.now.utc] unless queue.class_names.empty? conditions.first << " AND #{connection.quote_column_name('record_class_name')} IN (?)" conditions << queue.class_names end count(:conditions => conditions) end |
.reset!(queue) ⇒ Object
Implementation of the reset! method.
64 65 66 67 |
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 64 def reset! (queue) conditions = queue.class_names.empty? ? {} : {:record_class_name => queue.class_names} update_all({:run_at => Time.now.utc, :attempts => 0, :error => nil, :lock => nil}, conditions) end |
.total_count(queue) ⇒ Object
Implementation of the total_count method.
28 29 30 31 |
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 28 def total_count(queue) conditions = queue.class_names.empty? ? {} : {:record_class_name => queue.class_names} count(:conditions => conditions) end |
Instance Method Details
#reset! ⇒ Object
Implementation of the reset! method.
134 135 136 137 138 139 140 |
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 134 def reset! begin update_attributes!(:attempts => 0, :error => nil, :lock => nil, :run_at => Time.now.utc) rescue => e logger.warn(e) end end |
#set_error!(error, retry_interval = nil) ⇒ Object
Implementation of the set_error! method.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 118 def set_error!(error, retry_interval = nil) self.attempts += 1 self.run_at = (retry_interval * attempts).from_now.utc if retry_interval self.error = "#{error.class.name}: #{error.}\n#{error.backtrace.join("\n")[0, 4000]}" self.lock = nil begin save! rescue => e if logger logger.warn(error) logger.warn(e) end end end |