Class: Sunspot::IndexQueue::Entry::MongoImpl

Inherits:
Object
  • Object
show all
Includes:
Sunspot::IndexQueue::Entry
Defined in:
lib/sunspot/index_queue/entry/mongo_impl.rb

Overview

Implementation of an indexing queue backed by MongoDB (mongodb.org/). This implementation uses the mongo gem directly and so is independent of any ORM you may be using.

To set it up, you need to set the connection and database that it will use.

Sunspot::IndexQueue::Entry::MongoImpl.connection = 'localhost'
Sunspot::IndexQueue::Entry::MongoImpl.database_name = 'my_database'
# or
Sunspot::IndexQueue::Entry::MongoImpl.connection = Mongo::Connection.new('localhost', 27017)
Sunspot::IndexQueue::Entry::MongoImpl.database_name = 'my_database'

Instance Attribute Summary collapse

Attributes included from Sunspot::IndexQueue::Entry

#processed

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Sunspot::IndexQueue::Entry

enqueue, implementation, implementation=, load_all_records, #processed?, #record

Constructor Details

#initialize(attributes = {}) ⇒ MongoImpl

Create a new entry from a document hash.



153
154
155
156
157
158
159
160
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 153

def initialize(attributes = {})
  @doc = {}
  attributes.each do |key, value|
    @doc[key.to_s] = value
  end
  @doc['priority'] = 0 unless doc['priority']
  @doc['attempts'] = 0 unless doc['attempts']
end

Instance Attribute Details

#docObject (readonly)

Returns the value of attribute doc.



150
151
152
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 150

def doc
  @doc
end

Class Method Details

.add(klass, id, delete, priority) ⇒ Object

Implementation of the add method.



135
136
137
138
139
140
141
142
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 135

def add(klass, id, delete, priority)
  queue_entry_key = {:record_id => id, :record_class_name => klass.name, :lock => nil}
  queue_entry = find_one(queue_entry_key) || new(queue_entry_key.merge(:priority => priority))
  queue_entry.is_delete = delete
  queue_entry.priority = priority if priority > queue_entry.priority
  queue_entry.run_at = Time.now.utc
  queue_entry.save
end

.collectionObject

Get the collection used to store the queue.



38
39
40
41
42
43
44
45
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 38

def collection
  unless @collection
    @collection = connection.db(@database_name)["sunspot_index_queue_entries"]
    @collection.create_index([[:record_class_name, Mongo::ASCENDING], [:record_id, Mongo::ASCENDING]])
    @collection.create_index([[:run_at, Mongo::ASCENDING], [:record_class_name, Mongo::ASCENDING], [:priority, Mongo::DESCENDING]])
  end
  @collection
end

.connectionObject

Get the connection currently in use.



27
28
29
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 27

def connection
  @connection
end

.connection=(*args) ⇒ Object

Set the connection to MongoDB. The args can either be a Mongo::Connection object, or the args that can be used to create a new Mongo::Connection.



22
23
24
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 22

def connection=(*args)
  @connection = args.first.is_a?(Mongo::Connection) ? args.first : Mongo::Connection.new(*args)
end

.create(attributes) ⇒ Object

Create a new entry.



48
49
50
51
52
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 48

def create(attributes)
  entry = new(attributes)
  entry.save
  entry
end

.database_name=(name) ⇒ Object

Set the name of the database which will contain the queue collection.



32
33
34
35
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 32

def database_name=(name)
  @collection = nil
  @database_name = name
end

.delete_entries(entries) ⇒ Object

Implementation of the delete_entries method.



145
146
147
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 145

def delete_entries(entries)
  collection.remove(:_id => {'$in' => entries.map(&:id)})
end

.error_count(queue) ⇒ Object

Implementation of the error_count method.



91
92
93
94
95
96
97
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 91

def error_count(queue)
  conditions = {:error => {'$ne' => nil}}
  unless queue.class_names.empty?
    conditions[:record_class_name] = {'$in' => queue.class_names}
  end
  collection.find(conditions).count
end

.errors(queue, limit, offset) ⇒ Object

Implementation of the errors method.



100
101
102
103
104
105
106
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 100

def errors(queue, limit, offset)
  conditions = {:error => {'$ne' => nil}}
  unless queue.class_names.empty?
    conditions[:record_class_name] = {'$in' => queue.class_names}
  end
  find(conditions, :limit => limit, :skip => offset, :sort => :id)
end

.find(selector = {}, opts = {}) ⇒ Object

Find an array of entries given a selector.



61
62
63
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 61

def find(selector={}, opts={})
  collection.find(selector, opts).collect{|doc| new(doc)}
end

.find_one(spec_or_object_id = nil, opts = {}) ⇒ Object

Find one entry given a selector or object id.



55
56
57
58
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 55

def find_one(spec_or_object_id=nil, opts={})
  doc = collection.find_one(spec_or_object_id, opts)
  doc ? new(doc) : nil
end

.loggerObject

Logger used to log errors.



66
67
68
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 66

def logger
  @logger
end

.logger=(logger) ⇒ Object

Set the logger used to log errors.



71
72
73
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 71

def logger=(logger)
  @logger = logger
end

.next_batch!(queue) ⇒ Object

Implementation of the next_batch! method.



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 115

def next_batch!(queue)
  conditions = {:run_at => {'$lte' => Time.now.utc}}
  unless queue.class_names.empty?
    conditions[:record_class_name] = {'$in' => queue.class_names}
  end
  entries = []
  while entries.size < queue.batch_size
    begin
      lock = rand(0x7FFFFFFF)
      doc = collection.find_and_modify(:update => {"$set" => {:run_at => Time.now.utc + queue.retry_interval, :error => nil, :lock => lock}}, :query => conditions, :limit => queue.batch_size, :sort => [[:priority, Mongo::DESCENDING], [:run_at, Mongo::ASCENDING]])
      break unless doc
      entries << new(doc)
    rescue Mongo::OperationFailure
      break
    end
  end
  entries
end

.ready_count(queue) ⇒ Object

Implementation of the ready_count method.



82
83
84
85
86
87
88
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 82

def ready_count(queue)
  conditions = {:run_at => {'$lte' => Time.now.utc}}
  unless queue.class_names.empty?
    conditions[:record_class_name] = {'$in' => queue.class_names}
  end
  collection.find(conditions).count
end

.reset!(queue) ⇒ Object

Implementation of the reset! method.



109
110
111
112
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 109

def reset!(queue)
  conditions = queue.class_names.empty? ? {} : {:record_class_name => {'$in' => queue.class_names}}
  collection.update(conditions, {"$set" => {:run_at => Time.now.utc, :attempts => 0, :error => nil}}, :multi => true)
end

.total_count(queue) ⇒ Object

Implementation of the total_count method.



76
77
78
79
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 76

def total_count(queue)
  conditions = queue.class_names.empty? ? {} : {:record_class_name => {'$in' => queue.class_names}}
  collection.find(conditions).count
end

Instance Method Details

#==(value) ⇒ Object



271
272
273
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 271

def == (value)
  value.is_a?(self.class) && ((id && id == value.id) || (doc == value.doc))
end

#attemptsObject

Get the entry attempts.



209
210
211
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 209

def attempts
  doc['attempts'] || 0
end

#attempts=(value) ⇒ Object

Set the entry attempts.



214
215
216
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 214

def attempts=(value)
  doc['attempts'] =  value.to_i
end

#errorObject

Get the entry error.



219
220
221
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 219

def error
  doc['error']
end

#error=(value) ⇒ Object

Set the entry error.



224
225
226
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 224

def error=(value)
  doc['error'] =  value.nil? ? nil : value.to_s
end

#idObject

Get the entry id.



163
164
165
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 163

def id
  doc['_id']
end

#is_delete=(value) ⇒ Object

Set the entry delete entry flag.



234
235
236
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 234

def is_delete=(value)
  doc['is_delete'] =  !!value
end

#is_delete?Boolean

Get the entry delete entry flag.

Returns:

  • (Boolean)


229
230
231
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 229

def is_delete?
  doc['is_delete']
end

#priorityObject

Get the entry priority.



199
200
201
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 199

def priority
  doc['priority']
end

#priority=(value) ⇒ Object

Set the entry priority.



204
205
206
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 204

def priority=(value)
  doc['priority'] =  value.to_i
end

#record_class_nameObject

Get the entry id.



168
169
170
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 168

def record_class_name
  doc['record_class_name']
end

#record_class_name=(value) ⇒ Object

Set the entry record_class_name.



173
174
175
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 173

def record_class_name=(value)
  doc['record_class_name'] =  value.nil? ? nil : value.to_s
end

#record_idObject

Get the entry id.



178
179
180
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 178

def record_id
  doc['record_id']
end

#record_id=(value) ⇒ Object

Set the entry record_id.



183
184
185
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 183

def record_id=(value)
  doc['record_id'] =  value
end

#reset!Object

Implementation of the reset! method.



260
261
262
263
264
265
266
267
268
269
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 260

def reset!
  begin
    self.error = nil
    self.attempts = 0
    self.run_at = Time.now.utc
    self.save
  rescue => e
    self.class.logger.warn(e) if self.class.logger
  end
end

#run_atObject

Get the entry run_at time.



188
189
190
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 188

def run_at
  doc['run_at']
end

#run_at=(value) ⇒ Object

Set the entry run_at time.



193
194
195
196
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 193

def run_at=(value)
  value = Time.parse(value.to_s) unless value.nil? || value.is_a?(Time)
  doc['run_at'] =  value.nil? ? nil : value.utc
end

#saveObject

Save the entry to the database.



239
240
241
242
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 239

def save
  id = self.class.collection.save(doc)
  doc['_id'] = id if id
end

#set_error!(error, retry_interval = nil) ⇒ Object

Implementation of the set_error! method.



245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/sunspot/index_queue/entry/mongo_impl.rb', line 245

def set_error!(error, retry_interval = nil)
  self.attempts += 1
  self.run_at = (retry_interval * attempts).from_now.utc if retry_interval
  self.error = "#{error.class.name}: #{error.message}\n#{error.backtrace.join("\n")[0, 4000]}"
  begin
    save
  rescue => e
    if self.class.logger
      self.class.logger.warn(error)
      self.class.logger.warn(e)
    end
  end
end