Class: Sunspot::IndexQueue::Entry::MongoImpl
- Inherits:
-
Object
- Object
- Sunspot::IndexQueue::Entry::MongoImpl
- Includes:
- Sunspot::IndexQueue::Entry
- Defined in:
- lib/sunspot/index_queue/entry/mongo_impl.rb
Overview
Implementation of an indexing queue backed by MongoDB (mongodb.org/). This implementation uses the mongo gem directly and so is independent of any ORM you may be using.
To set it up, you need to set the connection and database that it will use.
Sunspot::IndexQueue::Entry::MongoImpl.connection = 'localhost'
Sunspot::IndexQueue::Entry::MongoImpl.database_name = 'my_database'
# or
Sunspot::IndexQueue::Entry::MongoImpl.connection = Mongo::Connection.new('localhost', 27017)
Sunspot::IndexQueue::Entry::MongoImpl.database_name = 'my_database'
Instance Attribute Summary collapse
-
#doc ⇒ Object
readonly
Returns the value of attribute doc.
Attributes included from Sunspot::IndexQueue::Entry
Class Method Summary collapse
-
.add(klass, id, delete, priority) ⇒ Object
Implementation of the add method.
-
.collection ⇒ Object
Get the collection used to store the queue.
-
.connection ⇒ Object
Get the connection currently in use.
-
.connection=(*args) ⇒ Object
Set the connection to MongoDB.
-
.create(attributes) ⇒ Object
Create a new entry.
-
.database_name=(name) ⇒ Object
Set the name of the database which will contain the queue collection.
-
.delete_entries(entries) ⇒ Object
Implementation of the delete_entries method.
-
.error_count(queue) ⇒ Object
Implementation of the error_count method.
-
.errors(queue, limit, offset) ⇒ Object
Implementation of the errors method.
-
.find(selector = {}, opts = {}) ⇒ Object
Find an array of entries given a selector.
-
.find_one(spec_or_object_id = nil, opts = {}) ⇒ Object
Find one entry given a selector or object id.
-
.logger ⇒ Object
Logger used to log errors.
-
.logger=(logger) ⇒ Object
Set the logger used to log errors.
-
.next_batch!(queue) ⇒ Object
Implementation of the next_batch! method.
-
.ready_count(queue) ⇒ Object
Implementation of the ready_count method.
-
.reset!(queue) ⇒ Object
Implementation of the reset! method.
-
.total_count(queue) ⇒ Object
Implementation of the total_count method.
Instance Method Summary collapse
- #==(value) ⇒ Object
-
#attempts ⇒ Object
Get the entry attempts.
-
#attempts=(value) ⇒ Object
Set the entry attempts.
-
#error ⇒ Object
Get the entry error.
-
#error=(value) ⇒ Object
Set the entry error.
-
#id ⇒ Object
Get the entry id.
-
#initialize(attributes = {}) ⇒ MongoImpl
constructor
Create a new entry from a document hash.
-
#is_delete=(value) ⇒ Object
Set the entry delete entry flag.
-
#is_delete? ⇒ Boolean
Get the entry delete entry flag.
-
#priority ⇒ Object
Get the entry priority.
-
#priority=(value) ⇒ Object
Set the entry priority.
-
#record_class_name ⇒ Object
Get the entry id.
-
#record_class_name=(value) ⇒ Object
Set the entry record_class_name.
-
#record_id ⇒ Object
Get the entry id.
-
#record_id=(value) ⇒ Object
Set the entry record_id.
-
#reset! ⇒ Object
Implementation of the reset! method.
-
#run_at ⇒ Object
Get the entry run_at time.
-
#run_at=(value) ⇒ Object
Set the entry run_at time.
-
#save ⇒ Object
Save the entry to the database.
-
#set_error!(error, retry_interval = nil) ⇒ Object
Implementation of the set_error! method.
Methods included from Sunspot::IndexQueue::Entry
enqueue, implementation, implementation=, load_all_records, #processed?, #record
Constructor Details
#initialize(attributes = {}) ⇒ MongoImpl
Create a new entry from a document hash.
153 154 155 156 157 158 159 160 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 153 def initialize(attributes = {}) @doc = {} attributes.each do |key, value| @doc[key.to_s] = value end @doc['priority'] = 0 unless doc['priority'] @doc['attempts'] = 0 unless doc['attempts'] end |
Instance Attribute Details
#doc ⇒ Object (readonly)
Returns the value of attribute doc.
150 151 152 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 150 def doc @doc end |
Class Method Details
.add(klass, id, delete, priority) ⇒ Object
Implementation of the add method.
135 136 137 138 139 140 141 142 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 135 def add(klass, id, delete, priority) queue_entry_key = {:record_id => id, :record_class_name => klass.name, :lock => nil} queue_entry = find_one(queue_entry_key) || new(queue_entry_key.merge(:priority => priority)) queue_entry.is_delete = delete queue_entry.priority = priority if priority > queue_entry.priority queue_entry.run_at = Time.now.utc queue_entry.save end |
.collection ⇒ Object
Get the collection used to store the queue.
38 39 40 41 42 43 44 45 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 38 def collection unless @collection @collection = connection.db(@database_name)["sunspot_index_queue_entries"] @collection.create_index([[:record_class_name, Mongo::ASCENDING], [:record_id, Mongo::ASCENDING]]) @collection.create_index([[:run_at, Mongo::ASCENDING], [:record_class_name, Mongo::ASCENDING], [:priority, Mongo::DESCENDING]]) end @collection end |
.connection ⇒ Object
Get the connection currently in use.
27 28 29 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 27 def connection @connection end |
.connection=(*args) ⇒ Object
Set the connection to MongoDB. The args can either be a Mongo::Connection object, or the args that can be used to create a new Mongo::Connection.
22 23 24 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 22 def connection=(*args) @connection = args.first.is_a?(Mongo::Connection) ? args.first : Mongo::Connection.new(*args) end |
.create(attributes) ⇒ Object
Create a new entry.
48 49 50 51 52 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 48 def create(attributes) entry = new(attributes) entry.save entry end |
.database_name=(name) ⇒ Object
Set the name of the database which will contain the queue collection.
32 33 34 35 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 32 def database_name=(name) @collection = nil @database_name = name end |
.delete_entries(entries) ⇒ Object
Implementation of the delete_entries method.
145 146 147 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 145 def delete_entries(entries) collection.remove(:_id => {'$in' => entries.map(&:id)}) end |
.error_count(queue) ⇒ Object
Implementation of the error_count method.
91 92 93 94 95 96 97 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 91 def error_count(queue) conditions = {:error => {'$ne' => nil}} unless queue.class_names.empty? conditions[:record_class_name] = {'$in' => queue.class_names} end collection.find(conditions).count end |
.errors(queue, limit, offset) ⇒ Object
Implementation of the errors method.
100 101 102 103 104 105 106 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 100 def errors(queue, limit, offset) conditions = {:error => {'$ne' => nil}} unless queue.class_names.empty? conditions[:record_class_name] = {'$in' => queue.class_names} end find(conditions, :limit => limit, :skip => offset, :sort => :id) end |
.find(selector = {}, opts = {}) ⇒ Object
Find an array of entries given a selector.
61 62 63 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 61 def find(selector={}, opts={}) collection.find(selector, opts).collect{|doc| new(doc)} end |
.find_one(spec_or_object_id = nil, opts = {}) ⇒ Object
Find one entry given a selector or object id.
55 56 57 58 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 55 def find_one(spec_or_object_id=nil, opts={}) doc = collection.find_one(spec_or_object_id, opts) doc ? new(doc) : nil end |
.logger ⇒ Object
Logger used to log errors.
66 67 68 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 66 def logger @logger end |
.logger=(logger) ⇒ Object
Set the logger used to log errors.
71 72 73 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 71 def logger=(logger) @logger = logger end |
.next_batch!(queue) ⇒ Object
Implementation of the next_batch! method.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 115 def next_batch!(queue) conditions = {:run_at => {'$lte' => Time.now.utc}} unless queue.class_names.empty? conditions[:record_class_name] = {'$in' => queue.class_names} end entries = [] while entries.size < queue.batch_size begin lock = rand(0x7FFFFFFF) doc = collection.find_and_modify(:update => {"$set" => {:run_at => Time.now.utc + queue.retry_interval, :error => nil, :lock => lock}}, :query => conditions, :limit => queue.batch_size, :sort => [[:priority, Mongo::DESCENDING], [:run_at, Mongo::ASCENDING]]) break unless doc entries << new(doc) rescue Mongo::OperationFailure break end end entries end |
.ready_count(queue) ⇒ Object
Implementation of the ready_count method.
82 83 84 85 86 87 88 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 82 def ready_count(queue) conditions = {:run_at => {'$lte' => Time.now.utc}} unless queue.class_names.empty? conditions[:record_class_name] = {'$in' => queue.class_names} end collection.find(conditions).count end |
.reset!(queue) ⇒ Object
Implementation of the reset! method.
109 110 111 112 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 109 def reset!(queue) conditions = queue.class_names.empty? ? {} : {:record_class_name => {'$in' => queue.class_names}} collection.update(conditions, {"$set" => {:run_at => Time.now.utc, :attempts => 0, :error => nil}}, :multi => true) end |
.total_count(queue) ⇒ Object
Implementation of the total_count method.
76 77 78 79 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 76 def total_count(queue) conditions = queue.class_names.empty? ? {} : {:record_class_name => {'$in' => queue.class_names}} collection.find(conditions).count end |
Instance Method Details
#==(value) ⇒ Object
271 272 273 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 271 def == (value) value.is_a?(self.class) && ((id && id == value.id) || (doc == value.doc)) end |
#attempts ⇒ Object
Get the entry attempts.
209 210 211 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 209 def attempts doc['attempts'] || 0 end |
#attempts=(value) ⇒ Object
Set the entry attempts.
214 215 216 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 214 def attempts=(value) doc['attempts'] = value.to_i end |
#error ⇒ Object
Get the entry error.
219 220 221 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 219 def error doc['error'] end |
#error=(value) ⇒ Object
Set the entry error.
224 225 226 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 224 def error=(value) doc['error'] = value.nil? ? nil : value.to_s end |
#id ⇒ Object
Get the entry id.
163 164 165 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 163 def id doc['_id'] end |
#is_delete=(value) ⇒ Object
Set the entry delete entry flag.
234 235 236 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 234 def is_delete=(value) doc['is_delete'] = !!value end |
#is_delete? ⇒ Boolean
Get the entry delete entry flag.
229 230 231 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 229 def is_delete? doc['is_delete'] end |
#priority ⇒ Object
Get the entry priority.
199 200 201 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 199 def priority doc['priority'] end |
#priority=(value) ⇒ Object
Set the entry priority.
204 205 206 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 204 def priority=(value) doc['priority'] = value.to_i end |
#record_class_name ⇒ Object
Get the entry id.
168 169 170 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 168 def record_class_name doc['record_class_name'] end |
#record_class_name=(value) ⇒ Object
Set the entry record_class_name.
173 174 175 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 173 def record_class_name=(value) doc['record_class_name'] = value.nil? ? nil : value.to_s end |
#record_id ⇒ Object
Get the entry id.
178 179 180 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 178 def record_id doc['record_id'] end |
#record_id=(value) ⇒ Object
Set the entry record_id.
183 184 185 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 183 def record_id=(value) doc['record_id'] = value end |
#reset! ⇒ Object
Implementation of the reset! method.
260 261 262 263 264 265 266 267 268 269 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 260 def reset! begin self.error = nil self.attempts = 0 self.run_at = Time.now.utc self.save rescue => e self.class.logger.warn(e) if self.class.logger end end |
#run_at ⇒ Object
Get the entry run_at time.
188 189 190 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 188 def run_at doc['run_at'] end |
#run_at=(value) ⇒ Object
Set the entry run_at time.
193 194 195 196 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 193 def run_at=(value) value = Time.parse(value.to_s) unless value.nil? || value.is_a?(Time) doc['run_at'] = value.nil? ? nil : value.utc end |
#save ⇒ Object
Save the entry to the database.
239 240 241 242 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 239 def save id = self.class.collection.save(doc) doc['_id'] = id if id end |
#set_error!(error, retry_interval = nil) ⇒ Object
Implementation of the set_error! method.
245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 245 def set_error!(error, retry_interval = nil) self.attempts += 1 self.run_at = (retry_interval * attempts).from_now.utc if retry_interval self.error = "#{error.class.name}: #{error.}\n#{error.backtrace.join("\n")[0, 4000]}" begin save rescue => e if self.class.logger self.class.logger.warn(error) self.class.logger.warn(e) end end end |