Class: Sunspot::IndexQueue::Entry::DataMapperImpl
- Inherits:
-
Object
- Object
- Sunspot::IndexQueue::Entry::DataMapperImpl
- Includes:
- DataMapper::Resource, Sunspot::IndexQueue::Entry
- Defined in:
- lib/sunspot/index_queue/entry/data_mapper_impl.rb
Overview
Implementation of an indexing queue backed by Datamapper.
To create the table, you can use dm-migrations
and run auto_migrate!
on this class.
This implementation assumes the primary key of the records being indexed in an integer since that works with most data models and is very efficient. If this is not the case, you can subclass this class and change the data type of the record_id
property.
Instance Attribute Summary
Attributes included from Sunspot::IndexQueue::Entry
Class Method Summary collapse
-
.add(klass, id, delete, priority) ⇒ Object
Implementation of the add method.
-
.delete_entries(entries) ⇒ Object
Implementation of the delete_entries method.
-
.error_count(queue) ⇒ Object
Implementation of the error_count method.
-
.errors(queue, limit, offset) ⇒ Object
Implementation of the errors method.
-
.next_batch!(queue) ⇒ Object
Implementation of the next_batch! method.
-
.ready_count(queue) ⇒ Object
Implementation of the ready_count method.
-
.reset!(queue) ⇒ Object
Implementation of the reset! method.
-
.total_count(queue) ⇒ Object
Implementation of the total_count method.
Instance Method Summary collapse
-
#reset! ⇒ Object
Implementation of the reset! method.
-
#set_error!(error, retry_interval = nil) ⇒ Object
Implementation of the set_error! method.
Methods included from Sunspot::IndexQueue::Entry
enqueue, implementation, implementation=, load_all_records, #processed?, #record
Class Method Details
.add(klass, id, delete, priority) ⇒ Object
Implementation of the add method.
76 77 78 79 80 81 82 83 |
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 76 def add(klass, id, delete, priority) queue_entry_key = {:record_id => id, :record_class_name => klass.name, :lock => nil} queue_entry = first(:conditions => queue_entry_key) || new(queue_entry_key.merge(:priority => priority)) queue_entry.is_delete = delete queue_entry.priority = priority if priority > queue_entry.priority queue_entry.run_at = Time.now.utc queue_entry.save! end |
.delete_entries(entries) ⇒ Object
Implementation of the delete_entries method.
86 87 88 |
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 86 def delete_entries(entries) all(:id => entries.map(&:id)).destroy! end |
.error_count(queue) ⇒ Object
Implementation of the error_count method.
44 45 46 47 48 |
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 44 def error_count(queue) conditions = {:error.not => nil} conditions[:record_class_name] = queue.class_names unless queue.class_names.empty? count(conditions) end |
.errors(queue, limit, offset) ⇒ Object
Implementation of the errors method.
51 52 53 54 55 |
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 51 def errors(queue, limit, offset) conditions = {:error.not => nil} conditions[:record_class_name] = queue.class_names unless queue.class_names.empty? all(conditions.merge(:limit => limit, :offset => offset, :order => :id)) end |
.next_batch!(queue) ⇒ Object
Implementation of the next_batch! method.
64 65 66 67 68 69 70 71 72 73 |
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 64 def next_batch!(queue) conditions = {:run_at.lte => Time.now.utc} conditions[:record_class_name] = queue.class_names unless queue.class_names.empty? batch_entries = all(conditions.merge(:fields => [:id], :limit => queue.batch_size, :order => [:priority.desc, :run_at])) queue_entry_ids = batch_entries.collect{|entry| entry.id} return [] if queue_entry_ids.empty? lock = rand(0x7FFFFFFF) all(:id => queue_entry_ids).update!(:run_at => Time.now.utc + queue.retry_interval, :lock => lock, :error => nil) all(:id => queue_entry_ids, :lock => lock) end |
.ready_count(queue) ⇒ Object
Implementation of the ready_count method.
37 38 39 40 41 |
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 37 def ready_count(queue) conditions = {:run_at.lte => Time.now.utc} conditions[:record_class_name] = queue.class_names unless queue.class_names.empty? count(conditions) end |
.reset!(queue) ⇒ Object
Implementation of the reset! method.
58 59 60 61 |
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 58 def reset!(queue) conditions = queue.class_names.empty? ? {} : {:record_class_name => queue.class_names} all(conditions).update!(:run_at => Time.now.utc, :attempts => 0, :error => nil, :lock => nil) end |
.total_count(queue) ⇒ Object
Implementation of the total_count method.
31 32 33 34 |
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 31 def total_count(queue) conditions = queue.class_names.empty? ? {} : {:record_class_name => queue.class_names} count(conditions) end |
Instance Method Details
#reset! ⇒ Object
Implementation of the reset! method.
108 109 110 111 112 113 114 |
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 108 def reset! begin update!(:attempts => 0, :error => nil, :lock => nil, :run_at => Time.now.utc) rescue => e DataMapper.logger.warn(e) end end |
#set_error!(error, retry_interval = nil) ⇒ Object
Implementation of the set_error! method.
92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 92 def set_error!(error, retry_interval = nil) self.attempts += 1 self.run_at = Time.now.utc + (retry_interval * attempts) if retry_interval self.error = "#{error.class.name}: #{error.}\n#{error.backtrace.join("\n")[0, 4000]}" self.lock = nil begin save! rescue => e if DataMapper.logger DataMapper.logger.warn(error) DataMapper.logger.warn(e) end end end |