Class: Sunspot::IndexQueue::Entry::RedisImpl

Inherits:
Object
  • Object
show all
Includes:
Sunspot::IndexQueue::Entry
Defined in:
lib/sunspot/index_queue/entry/redis_impl.rb

Instance Attribute Summary collapse

Attributes included from Sunspot::IndexQueue::Entry

#processed

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Sunspot::IndexQueue::Entry

enqueue, implementation, implementation=, load_all_records, #processed?, #record

Constructor Details

#initialize(options = {}) ⇒ RedisImpl

Returns a new instance of RedisImpl.



159
160
161
162
163
164
165
166
167
168
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 159

def initialize(options = {})
  [:record_id, :record_class_name, :is_delete, :run_at, :priority, :error, :attempts, :is_locked, :duplicate].each do |attribute|
    instance_variable_set("@#{attribute.to_s}", options[attribute] || options[attribute.to_s])
    @attempts ||= 0
    @priority ||= 0
    @is_delete ||= false
    @is_locked ||= false
    @duplicate ||= false
  end
end

Instance Attribute Details

#attemptsObject

Returns the value of attribute attempts.



14
15
16
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14

def attempts
  @attempts
end

#duplicateObject

Returns the value of attribute duplicate.



14
15
16
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14

def duplicate
  @duplicate
end

#errorObject

Returns the value of attribute error.



14
15
16
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14

def error
  @error
end

#is_deleteObject

Returns the value of attribute is_delete.



14
15
16
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14

def is_delete
  @is_delete
end

#is_lockedObject

Returns the value of attribute is_locked.



14
15
16
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14

def is_locked
  @is_locked
end

#priorityObject

Returns the value of attribute priority.



14
15
16
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14

def priority
  @priority
end

#record_class_nameObject

Returns the value of attribute record_class_name.



14
15
16
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14

def record_class_name
  @record_class_name
end

#record_idObject

Returns the value of attribute record_id.



14
15
16
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14

def record_id
  @record_id
end

#run_atObject

Returns the value of attribute run_at.



14
15
16
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14

def run_at
  @run_at
end

Class Method Details

.add(klass, id, delete, priority) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 119

def add(klass, id, delete, priority)
  redis_object = if @connection.hexists(@datastore_name, "#{klass.name}_#{id}") && !find_entry("#{klass.name}_#{id}").is_locked
                   find_entry("#{klass.name}_#{id}")
                 else
                   if @connection.hexists(@datastore_name, "#{klass.name}_#{id}_dup") && !find_entry("#{klass.name}_#{id}_dup").is_locked
                     find_entry("#{klass.name}_#{id}_dup")
                   else
                     new(:priority => priority, :record_class_name => klass.name, :record_id => id)
                   end
                 end
  redis_object.is_delete = delete
  redis_object.priority = priority if priority > redis_object.priority
  redis_object.run_at = Time.now.utc
  redis_key = (@connection.hexists(@datastore_name, "#{klass.name}_#{id}") && find_entry("#{klass.name}_#{id}").is_locked) ?
              "#{klass.name}_#{id}_dup" :
              "#{klass.name}_#{id}"
  redis_object.duplicate = @connection.hexists(@datastore_name, "#{klass.name}_#{id}") && find_entry("#{klass.name}_#{id}").is_locked
  @connection.hset(@datastore_name, redis_key, redis_object.json_formatted)
end

.collectionObject



44
45
46
47
48
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 44

def collection
  object_array = []
  @connection.hvals(@datastore_name).each {|value| object_array << self.new(JSON.parse(value))}
  object_array.sort
end

.connectionObject



24
25
26
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 24

def connection
  @connection
end

.connection=(*args) ⇒ Object



17
18
19
20
21
22
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 17

def connection= (*args)
  host, port = *args
  host ||= 'localhost'
  port ||= 6379
  @connection = Redis.new(:host => host, :port => port)
end

.create(attributes) ⇒ Object



139
140
141
142
143
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 139

def create(attributes)
  redis_object = new(attributes)
  @connection.hset(@datastore_name, "#{redis_object.record_class_name}_#{redis_object.record_id}", redis_object.json_formatted)
  redis_object
end

.datastore_nameObject



40
41
42
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 40

def datastore_name
  @datastore_name
end

.datastore_name=(name) ⇒ Object



36
37
38
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 36

def datastore_name= (name)
  @datastore_name = name
end

.delete_entries(records) ⇒ Object



145
146
147
148
149
150
151
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 145

def delete_entries (records)
  records.each do |record|
    redis_key = "#{record.record_class_name}_#{record.record_id}"
    redis_key << "_dup" if record.duplicate
    @connection.hdel @datastore_name, redis_key
  end
end

.error_count(queue) ⇒ Object



67
68
69
70
71
72
73
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 67

def error_count(queue)
  if queue.class_names.empty?
    collection.select{|object| object.error != nil}.size
  else
    collection.select{|object| (object.error != nil) && queue.class_names.include?(object.record_class_name)}.size
  end
end

.errors(queue, limit, offset) ⇒ Object



75
76
77
78
79
80
81
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 75

def errors(queue, limit, offset)
  if queue.class_names.empty?
    collection.select{|object| object.error != nil}.slice(offset..limit)
  else
    collection.select{|object| (object.error != nil) && queue.class_names.include?(object.record_class_name)}.slice(offset..limit)
  end
end

.find_entry(id) ⇒ Object



153
154
155
156
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 153

def find_entry(id)
  @connection.hexists(@datastore_name, id) ?
  new(JSON.parse(@connection.hget(@datastore_name, id))) : nil
end

.loggerObject



28
29
30
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 28

def logger
  @logger
end

.logger=(logger) ⇒ Object



32
33
34
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 32

def logger= (logger)
  @logger = logger
end

.next_batch!(queue) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 99

def next_batch! (queue)
  object_array = []
  collection_to_process =
  if queue.class_names.empty?
    collection
  else
    collection.select{|object| queue.class_names.include?(object.record_class_name)}
  end
  collection_to_process.each do |object|
    object_array << object if (Time.parse(object.run_at) <= Time.now.utc) && !object.is_locked
  end
  sliced_object_array = object_array.slice!(0..(queue.batch_size - 1))
  sliced_object_array = sliced_object_array.nil? ? [] : sliced_object_array
  sliced_object_array.each do |object|
    object.is_locked = true
    @connection.hset @datastore_name, "#{object.record_class_name}_#{object.record_id}", object.json_formatted
  end
  sliced_object_array
end

.ready_count(queue) ⇒ Object



58
59
60
61
62
63
64
65
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 58

def ready_count(queue)
  present_time = Time.now.utc
  if queue.class_names.empty?
    collection.select{|object| Time.parse(object.run_at) < present_time}.size
  else
    collection.select{|object| (Time.parse(object.run_at) < present_time) && queue.class_names.include?(object.record_class_name)}.size
  end
end

.reset!(queue) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 83

def reset!(queue)
  collection_to_reset =
  if queue.class_names.empty?
    collection
  else
    collection.select{|object| queue.class_names.include?(object.record_class_name)}
  end
  collection_to_reset.each do |object|
    object.run_at = Time.now.utc
    object.attempts = 0
    object.error = nil
    object.is_locked = false
    @connection.hset(@datastore_name, "#{object.record_class_name}_#{object.record_id}", object.json_formatted)
  end
end

.total_count(queue) ⇒ Object



50
51
52
53
54
55
56
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 50

def total_count(queue)
  if queue.class_names.empty?
    @connection.hlen @datastore_name
  else
    collection.select{|object| queue.class_names.include?(object.record_class_name)}.size
  end
end

Instance Method Details

#<=>(redis_object) ⇒ Object



210
211
212
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 210

def <=> (redis_object)
  priority.nil? ? (redis_object.run_at <=> self.run_at) : (redis_object.priority <=> self.priority)
end

#idObject



202
203
204
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 202

def id
  "#{record_class_name}_#{record_id}"
end

#is_delete?Boolean

Returns:

  • (Boolean)


206
207
208
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 206

def is_delete?
  is_delete
end

#json_formattedObject



170
171
172
173
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 170

def json_formatted
  JSON.dump("record_id" => self.record_id, "record_class_name" => self.record_class_name, "is_delete" => self.is_delete, "duplicate" => self.duplicate,
   "run_at" => self.run_at, "priority" => self.priority, "error" => self.error, "attempts" => self.attempts, "is_locked" => self.is_locked)
end

#reset!Object



189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 189

def reset!
  begin
    self.run_at = Time.now.utc
    self.attempts = 0
    self.error = nil
    self.class.connection.hset(self.class.datastore_name, "#{self.record_class_name}_#{self.record_id}", self.json_formatted)
  rescue => e
    if logger = self.class.logger
      logger.warn(e)
    end
  end
end

#set_error!(error, retry_interval = nil) ⇒ Object



175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 175

def set_error! (error, retry_interval = nil)
  self.attempts += 1
  self.run_at = Time.now.utc + (retry_interval * attempts) if retry_interval
  self.error = "#{error.class.name}: #{error.message}\n#{error.backtrace.join("\n")[0, 4000]}"
  begin
    self.class.connection.hset(self.class.datastore_name, "#{self.record_class_name}_#{self.record_id}", self.json_formatted)
  rescue => e
    if logger = self.class.logger
      logger.warn(error)
      logger.warn(e)
    end
  end
end