Class: Sunspot::IndexQueue::Entry::RedisImpl
Instance Attribute Summary collapse
#processed
Class Method Summary
collapse
Instance Method Summary
collapse
enqueue, implementation, implementation=, load_all_records, #processed?, #record
Constructor Details
#initialize(options = {}) ⇒ RedisImpl
Returns a new instance of RedisImpl.
159
160
161
162
163
164
165
166
167
168
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 159
def initialize(options = {})
[:record_id, :record_class_name, :is_delete, :run_at, :priority, :error, :attempts, :is_locked, :duplicate].each do |attribute|
instance_variable_set("@#{attribute.to_s}", options[attribute] || options[attribute.to_s])
@attempts ||= 0
@priority ||= 0
@is_delete ||= false
@is_locked ||= false
@duplicate ||= false
end
end
|
Instance Attribute Details
#attempts ⇒ Object
Returns the value of attribute attempts.
14
15
16
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14
def attempts
@attempts
end
|
#duplicate ⇒ Object
Returns the value of attribute duplicate.
14
15
16
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14
def duplicate
@duplicate
end
|
#error ⇒ Object
Returns the value of attribute error.
14
15
16
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14
def error
@error
end
|
#is_delete ⇒ Object
Returns the value of attribute is_delete.
14
15
16
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14
def is_delete
@is_delete
end
|
#is_locked ⇒ Object
Returns the value of attribute is_locked.
14
15
16
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14
def is_locked
@is_locked
end
|
#priority ⇒ Object
Returns the value of attribute priority.
14
15
16
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14
def priority
@priority
end
|
#record_class_name ⇒ Object
Returns the value of attribute record_class_name.
14
15
16
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14
def record_class_name
@record_class_name
end
|
#record_id ⇒ Object
Returns the value of attribute record_id.
14
15
16
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14
def record_id
@record_id
end
|
#run_at ⇒ Object
Returns the value of attribute run_at.
14
15
16
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 14
def run_at
@run_at
end
|
Class Method Details
.add(klass, id, delete, priority) ⇒ Object
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 119
def add(klass, id, delete, priority)
redis_object = if @connection.hexists(@datastore_name, "#{klass.name}_#{id}") && !find_entry("#{klass.name}_#{id}").is_locked
find_entry("#{klass.name}_#{id}")
else
if @connection.hexists(@datastore_name, "#{klass.name}_#{id}_dup") && !find_entry("#{klass.name}_#{id}_dup").is_locked
find_entry("#{klass.name}_#{id}_dup")
else
new(:priority => priority, :record_class_name => klass.name, :record_id => id)
end
end
redis_object.is_delete = delete
redis_object.priority = priority if priority > redis_object.priority
redis_object.run_at = Time.now.utc
redis_key = (@connection.hexists(@datastore_name, "#{klass.name}_#{id}") && find_entry("#{klass.name}_#{id}").is_locked) ?
"#{klass.name}_#{id}_dup" :
"#{klass.name}_#{id}"
redis_object.duplicate = @connection.hexists(@datastore_name, "#{klass.name}_#{id}") && find_entry("#{klass.name}_#{id}").is_locked
@connection.hset(@datastore_name, redis_key, redis_object.json_formatted)
end
|
.collection ⇒ Object
44
45
46
47
48
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 44
def collection
object_array = []
@connection.hvals(@datastore_name).each {|value| object_array << self.new(JSON.parse(value))}
object_array.sort
end
|
.connection ⇒ Object
24
25
26
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 24
def connection
@connection
end
|
.connection=(*args) ⇒ Object
17
18
19
20
21
22
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 17
def connection= (*args)
host, port = *args
host ||= 'localhost'
port ||= 6379
@connection = Redis.new(:host => host, :port => port)
end
|
.create(attributes) ⇒ Object
139
140
141
142
143
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 139
def create(attributes)
redis_object = new(attributes)
@connection.hset(@datastore_name, "#{redis_object.record_class_name}_#{redis_object.record_id}", redis_object.json_formatted)
redis_object
end
|
.datastore_name ⇒ Object
40
41
42
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 40
def datastore_name
@datastore_name
end
|
.datastore_name=(name) ⇒ Object
36
37
38
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 36
def datastore_name= (name)
@datastore_name = name
end
|
.delete_entries(records) ⇒ Object
145
146
147
148
149
150
151
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 145
def delete_entries (records)
records.each do |record|
redis_key = "#{record.record_class_name}_#{record.record_id}"
redis_key << "_dup" if record.duplicate
@connection.hdel @datastore_name, redis_key
end
end
|
.error_count(queue) ⇒ Object
67
68
69
70
71
72
73
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 67
def error_count(queue)
if queue.class_names.empty?
collection.select{|object| object.error != nil}.size
else
collection.select{|object| (object.error != nil) && queue.class_names.include?(object.record_class_name)}.size
end
end
|
.errors(queue, limit, offset) ⇒ Object
75
76
77
78
79
80
81
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 75
def errors(queue, limit, offset)
if queue.class_names.empty?
collection.select{|object| object.error != nil}.slice(offset..limit)
else
collection.select{|object| (object.error != nil) && queue.class_names.include?(object.record_class_name)}.slice(offset..limit)
end
end
|
.find_entry(id) ⇒ Object
153
154
155
156
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 153
def find_entry(id)
@connection.hexists(@datastore_name, id) ?
new(JSON.parse(@connection.hget(@datastore_name, id))) : nil
end
|
.logger ⇒ Object
28
29
30
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 28
def logger
@logger
end
|
.logger=(logger) ⇒ Object
32
33
34
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 32
def logger= (logger)
@logger = logger
end
|
.next_batch!(queue) ⇒ Object
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 99
def next_batch! (queue)
object_array = []
collection_to_process =
if queue.class_names.empty?
collection
else
collection.select{|object| queue.class_names.include?(object.record_class_name)}
end
collection_to_process.each do |object|
object_array << object if (Time.parse(object.run_at) <= Time.now.utc) && !object.is_locked
end
sliced_object_array = object_array.slice!(0..(queue.batch_size - 1))
sliced_object_array = sliced_object_array.nil? ? [] : sliced_object_array
sliced_object_array.each do |object|
object.is_locked = true
@connection.hset @datastore_name, "#{object.record_class_name}_#{object.record_id}", object.json_formatted
end
sliced_object_array
end
|
.ready_count(queue) ⇒ Object
58
59
60
61
62
63
64
65
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 58
def ready_count(queue)
present_time = Time.now.utc
if queue.class_names.empty?
collection.select{|object| Time.parse(object.run_at) < present_time}.size
else
collection.select{|object| (Time.parse(object.run_at) < present_time) && queue.class_names.include?(object.record_class_name)}.size
end
end
|
.reset!(queue) ⇒ Object
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 83
def reset!(queue)
collection_to_reset =
if queue.class_names.empty?
collection
else
collection.select{|object| queue.class_names.include?(object.record_class_name)}
end
collection_to_reset.each do |object|
object.run_at = Time.now.utc
object.attempts = 0
object.error = nil
object.is_locked = false
@connection.hset(@datastore_name, "#{object.record_class_name}_#{object.record_id}", object.json_formatted)
end
end
|
.total_count(queue) ⇒ Object
50
51
52
53
54
55
56
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 50
def total_count(queue)
if queue.class_names.empty?
@connection.hlen @datastore_name
else
collection.select{|object| queue.class_names.include?(object.record_class_name)}.size
end
end
|
Instance Method Details
#<=>(redis_object) ⇒ Object
210
211
212
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 210
def <=> (redis_object)
priority.nil? ? (redis_object.run_at <=> self.run_at) : (redis_object.priority <=> self.priority)
end
|
#id ⇒ Object
202
203
204
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 202
def id
"#{record_class_name}_#{record_id}"
end
|
#is_delete? ⇒ Boolean
206
207
208
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 206
def is_delete?
is_delete
end
|
170
171
172
173
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 170
def json_formatted
JSON.dump("record_id" => self.record_id, "record_class_name" => self.record_class_name, "is_delete" => self.is_delete, "duplicate" => self.duplicate,
"run_at" => self.run_at, "priority" => self.priority, "error" => self.error, "attempts" => self.attempts, "is_locked" => self.is_locked)
end
|
#reset! ⇒ Object
189
190
191
192
193
194
195
196
197
198
199
200
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 189
def reset!
begin
self.run_at = Time.now.utc
self.attempts = 0
self.error = nil
self.class.connection.hset(self.class.datastore_name, "#{self.record_class_name}_#{self.record_id}", self.json_formatted)
rescue => e
if logger = self.class.logger
logger.warn(e)
end
end
end
|
#set_error!(error, retry_interval = nil) ⇒ Object
175
176
177
178
179
180
181
182
183
184
185
186
187
|
# File 'lib/sunspot/index_queue/entry/redis_impl.rb', line 175
def set_error! (error, retry_interval = nil)
self.attempts += 1
self.run_at = Time.now.utc + (retry_interval * attempts) if retry_interval
self.error = "#{error.class.name}: #{error.message}\n#{error.backtrace.join("\n")[0, 4000]}"
begin
self.class.connection.hset(self.class.datastore_name, "#{self.record_class_name}_#{self.record_id}", self.json_formatted)
rescue => e
if logger = self.class.logger
logger.warn(error)
logger.warn(e)
end
end
end
|