Module: Sunspot::IndexQueue::Entry
- Included in:
- ActiveRecordImpl, DataMapperImpl, MongoImpl, RedisImpl
- Defined in:
- lib/sunspot/index_queue/entry.rb,
lib/sunspot/index_queue/entry/mongo_impl.rb,
lib/sunspot/index_queue/entry/redis_impl.rb,
lib/sunspot/index_queue/entry/data_mapper_impl.rb,
lib/sunspot/index_queue/entry/active_record_impl.rb
Overview
Abstract queue entry interface. All the gory details on actually handling the queue are handled by a specific implementation class. The default implementation will use ActiveRecord as the backing queue.
Implementing classes must define attribute readers for id
, record_class_name
, record_id
, error
, attempts
, and is_delete?
.
Defined Under Namespace
Classes: ActiveRecordImpl, DataMapperImpl, MongoImpl, RedisImpl
Instance Attribute Summary collapse
-
#processed ⇒ Object
writeonly
Sets the attribute processed.
Class Method Summary collapse
-
.add(klass, id, delete, options = {}) ⇒ Object
Add an entry the queue.
-
.delete_entries(entries) ⇒ Object
Delete entries from the queue.
-
.enqueue(queue, klass, ids, delete, priority) ⇒ Object
Add multiple entries to the queue.
-
.error_count(queue) ⇒ Object
Get a count of the error entries for an IndexQueue.
-
.errors(queue, limit, offset) ⇒ Object
Get the specified number of error entries for an IndexQueue.
-
.implementation ⇒ Object
The implementation class used for the queue.
-
.implementation=(klass) ⇒ Object
Set the implementation class to use for the queue.
-
.load_all_records(entries) ⇒ Object
Load all records in an array of entries.
-
.next_batch!(queue) ⇒ Object
Get the next batch of entries to process for IndexQueue.
-
.ready_count(queue) ⇒ Object
Get a count of the entries ready to be processed for an IndexQueue.
-
.reset!(queue) ⇒ Object
Reset the entries in the queue to be excuted again immediately and clear any errors.
-
.total_count(queue) ⇒ Object
Get a count of the queue entries for an IndexQueue.
Instance Method Summary collapse
- #processed? ⇒ Boolean
-
#record ⇒ Object
Get the record represented by this entry.
-
#reset! ⇒ Object
Reset an entry to be executed again immediatel and clear any errors.
-
#set_error!(error, retry_interval = nil) ⇒ Object
Set the error message on an entry.
Instance Attribute Details
#processed=(value) ⇒ Object (writeonly)
Sets the attribute processed
13 14 15 |
# File 'lib/sunspot/index_queue/entry.rb', line 13 def processed=(value) @processed = value end |
Class Method Details
.add(klass, id, delete, options = {}) ⇒ Object
Add an entry the queue. is_delete
will be true if the entry is a delete. Implementations must implement this method.
76 77 78 |
# File 'lib/sunspot/index_queue/entry.rb', line 76 def add(klass, id, delete, = {}) raise NotImplementedError.new("add") end |
.delete_entries(entries) ⇒ Object
Delete entries from the queue. Implementations must implement this method.
97 98 99 |
# File 'lib/sunspot/index_queue/entry.rb', line 97 def delete_entries(entries) implementation.delete_entries(entries) end |
.enqueue(queue, klass, ids, delete, priority) ⇒ Object
Add multiple entries to the queue. delete
will be true if the entry is a delete.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/sunspot/index_queue/entry.rb', line 81 def enqueue(queue, klass, ids, delete, priority) klass = Sunspot::Util.full_const_get(klass.to_s) unless klass.is_a?(Class) unless queue.class_names.empty? || queue.class_names.include?(klass.name) raise ArgumentError.new("Class #{klass.name} is not in the class names allowed for the queue") end priority = priority.to_i if ids.is_a?(Array) ids.each do |id| implementation.add(klass, id, delete, priority) end else implementation.add(klass, ids, delete, priority) end end |
.error_count(queue) ⇒ Object
Get a count of the error entries for an IndexQueue. Implementations must implement this method.
56 57 58 |
# File 'lib/sunspot/index_queue/entry.rb', line 56 def error_count(queue) implementation.error_count(queue) end |
.errors(queue, limit, offset) ⇒ Object
Get the specified number of error entries for an IndexQueue. Implementations must implement this method.
61 62 63 |
# File 'lib/sunspot/index_queue/entry.rb', line 61 def errors(queue, limit, offset) implementation.errors(queue, limit, offset) end |
.implementation ⇒ Object
The implementation class used for the queue.
41 42 43 |
# File 'lib/sunspot/index_queue/entry.rb', line 41 def implementation @implementation ||= ActiveRecordImpl end |
.implementation=(klass) ⇒ Object
Set the implementation class to use for the queue. This can be set as either a class object, full class name, or a symbol representing one of the default implementations.
# These all set the implementation to use the default ActiveRecord queue.
Sunspot::IndexQueue::Entry.implementation = :active_record
Sunspot::IndexQueue::Entry.implementation = "Sunspot::IndexQueue::Entry::ActiveRecordImpl"
Sunspot::IndexQueue::Entry.implementation = Sunspot::IndexQueue::Entry::ActiveRecordImpl
Implementations should support pulling entries in batches by a priority where higher priority entries are processed first. Errors should be automatically retried after an interval specified by the IndexQueue. The batch size set by the IndexQueue should also be honored.
27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/sunspot/index_queue/entry.rb', line 27 def implementation=(klass) unless klass.is_a?(Class) || klass.nil? class_name = klass.to_s class_name = Sunspot::Util.camel_case(class_name).gsub('/', '::') unless class_name.include?('::') if class_name.include?('::') || !const_defined?("#{class_name}Impl") klass = Sunspot::Util.full_const_get(class_name) else klass = const_get("#{class_name}Impl") end end @implementation = klass end |
.load_all_records(entries) ⇒ Object
Load all records in an array of entries. This can be faster than calling load on each DataAccessor depending on the implementation
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/sunspot/index_queue/entry.rb', line 103 def load_all_records(entries) #First create maps for classes => ids and "Classname id" => entry class_id_map = {} entry_map = {} entries.each do |entry| classname = Sunspot::Util.full_const_get(entry.record_class_name) class_id_map[classname] = [] if class_id_map[classname].nil? class_id_map[classname] << entry.record_id entry_map["#{classname} #{entry.record_id}"] = entry end class_id_map.each do |klass, ids| adapter = Sunspot::Adapters::DataAccessor.create(klass) if klass.respond_to?(:sunspot_options) && klass. && klass.[:include] && adapter.respond_to?(:include=) adapter.include = klass.[:include] end adapter.load_all(ids).each do |record| entry = entry_map["#{klass} #{Sunspot::Adapters::InstanceAdapter.adapt(record).id}"] entry.instance_variable_set(:@record, record) if entry end end end |
.next_batch!(queue) ⇒ Object
Get the next batch of entries to process for IndexQueue. Implementations must implement this method.
66 67 68 |
# File 'lib/sunspot/index_queue/entry.rb', line 66 def next_batch!(queue) implementation.next_batch!(queue) end |
.ready_count(queue) ⇒ Object
Get a count of the entries ready to be processed for an IndexQueue. Implementations must implement this method.
51 52 53 |
# File 'lib/sunspot/index_queue/entry.rb', line 51 def ready_count(queue) implementation.ready_count(queue) end |
.reset!(queue) ⇒ Object
Reset the entries in the queue to be excuted again immediately and clear any errors.
71 72 73 |
# File 'lib/sunspot/index_queue/entry.rb', line 71 def reset!(queue) implementation.reset!(queue) end |
.total_count(queue) ⇒ Object
Get a count of the queue entries for an IndexQueue. Implementations must implement this method.
46 47 48 |
# File 'lib/sunspot/index_queue/entry.rb', line 46 def total_count(queue) implementation.total_count(queue) end |
Instance Method Details
#processed? ⇒ Boolean
127 128 129 130 |
# File 'lib/sunspot/index_queue/entry.rb', line 127 def processed? @processed = false unless defined?(@processed) @processed end |
#record ⇒ Object
Get the record represented by this entry.
133 134 135 |
# File 'lib/sunspot/index_queue/entry.rb', line 133 def record @record ||= Sunspot::Adapters::DataAccessor.create(Sunspot::Util.full_const_get(record_class_name)).load_all([record_id]).first end |
#reset! ⇒ Object
Reset an entry to be executed again immediatel and clear any errors. Implementations must implement this method.
143 144 145 |
# File 'lib/sunspot/index_queue/entry.rb', line 143 def reset! raise NotImplementedError.new("reset!") end |
#set_error!(error, retry_interval = nil) ⇒ Object
Set the error message on an entry. Implementations must implement this method.
138 139 140 |
# File 'lib/sunspot/index_queue/entry.rb', line 138 def set_error!(error, retry_interval = nil) raise NotImplementedError.new("set_error!") end |