Class: Cloudtasker::Backend::RedisTask
- Inherits:
-
Object
- Object
- Cloudtasker::Backend::RedisTask
- Defined in:
- lib/cloudtasker/backend/redis_task.rb
Overview
Manage local tasks pushed to Redis
Constant Summary collapse
- RETRY_INTERVAL =
seconds
20
Instance Attribute Summary collapse
-
#dispatch_deadline ⇒ Object
readonly
Returns the value of attribute dispatch_deadline.
-
#http_request ⇒ Object
readonly
Returns the value of attribute http_request.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#retries ⇒ Object
readonly
Returns the value of attribute retries.
-
#schedule_time ⇒ Object
readonly
Returns the value of attribute schedule_time.
Class Method Summary collapse
-
.all ⇒ Array<Cloudtasker::Backend::RedisTask>
Return all tasks stored in Redis.
-
.create(payload) ⇒ Object
Push a job to the queue.
-
.delete(id) ⇒ Object
Delete a task by id.
-
.find(id) ⇒ Cloudtasker::Backend::RedisTask?
Get a task by id.
-
.key(val = nil) ⇒ String
Return a namespaced key.
-
.pop(queue = nil) ⇒ Cloudtasker::Backend::RedisTask
Retrieve and remove a task from the queue.
-
.ready_to_process(queue = nil) ⇒ Array<Cloudtasker::Backend::RedisTask>
Reeturn all tasks ready to process.
-
.redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
Equality operator.
-
#deliver ⇒ Object
Deliver the task to the processing endpoint.
-
#destroy ⇒ Object
Remove the task from the queue.
-
#gid ⇒ <Type>
Return the namespaced task id.
-
#initialize(id:, http_request:, schedule_time: nil, retries: 0, queue: nil, dispatch_deadline: nil) ⇒ RedisTask
constructor
Build a new instance of the class.
-
#redis ⇒ Class
Return the redis client.
-
#retry_later(interval, opts = {}) ⇒ Object
Retry the task later.
-
#to_h ⇒ Hash
Return a hash description of the task.
Constructor Details
#initialize(id:, http_request:, schedule_time: nil, retries: 0, queue: nil, dispatch_deadline: nil) ⇒ RedisTask
Build a new instance of the class.
128 129 130 131 132 133 134 135 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 128 def initialize(id:, http_request:, schedule_time: nil, retries: 0, queue: nil, dispatch_deadline: nil) @id = id @http_request = http_request @schedule_time = Time.at(schedule_time || 0) @retries = retries || 0 @queue = queue || Config::DEFAULT_JOB_QUEUE @dispatch_deadline = dispatch_deadline || Config::DEFAULT_DISPATCH_DEADLINE end |
Instance Attribute Details
#dispatch_deadline ⇒ Object (readonly)
Returns the value of attribute dispatch_deadline.
10 11 12 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 10 def dispatch_deadline @dispatch_deadline end |
#http_request ⇒ Object (readonly)
Returns the value of attribute http_request.
10 11 12 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 10 def http_request @http_request end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
10 11 12 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 10 def id @id end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
10 11 12 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 10 def queue @queue end |
#retries ⇒ Object (readonly)
Returns the value of attribute retries.
10 11 12 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 10 def retries @retries end |
#schedule_time ⇒ Object (readonly)
Returns the value of attribute schedule_time.
10 11 12 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 10 def schedule_time @schedule_time end |
Class Method Details
.all ⇒ Array<Cloudtasker::Backend::RedisTask>
Return all tasks stored in Redis.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 39 def self.all if redis.exists?(key) # Use Schedule Set if available redis.smembers(key).map { |id| find(id) }.compact else # Fallback to redis key matching and migrate tasks # to use Task Set instead. redis.search(key('*')).map do |gid| task_id = gid.sub(key(''), '') redis.sadd(key, [task_id]) find(task_id) end end end |
.create(payload) ⇒ Object
Push a job to the queue.
85 86 87 88 89 90 91 92 93 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 85 def self.create(payload) id = SecureRandom.uuid payload = payload.merge(schedule_time: payload[:schedule_time].to_i) # Save job redis.write(key(id), payload) redis.sadd(key, [id]) new(**payload.merge(id: id)) end |
.delete(id) ⇒ Object
Delete a task by id.
114 115 116 117 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 114 def self.delete(id) redis.srem(key, [id]) redis.del(key(id)) end |
.find(id) ⇒ Cloudtasker::Backend::RedisTask?
Get a task by id.
102 103 104 105 106 107 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 102 def self.find(id) gid = key(id) return nil unless (payload = redis.fetch(gid)) new(**payload.merge(id: id)) end |
.key(val = nil) ⇒ String
Return a namespaced key.
30 31 32 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 30 def self.key(val = nil) [to_s.underscore, val].compact.map(&:to_s).join('/') end |
.pop(queue = nil) ⇒ Cloudtasker::Backend::RedisTask
Retrieve and remove a task from the queue.
74 75 76 77 78 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 74 def self.pop(queue = nil) redis.with_lock('cloudtasker/server') do ready_to_process(queue).first&.tap(&:destroy) end end |
.ready_to_process(queue = nil) ⇒ Array<Cloudtasker::Backend::RedisTask>
Reeturn all tasks ready to process.
61 62 63 64 65 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 61 def self.ready_to_process(queue = nil) list = all.select { |e| e.schedule_time <= Time.now } list = list.select { |e| e.queue == queue } if queue list end |
.redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client
19 20 21 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 19 def self.redis @redis ||= RedisClient.new end |
Instance Method Details
#==(other) ⇒ Boolean
Equality operator.
234 235 236 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 234 def ==(other) other.is_a?(self.class) && other.id == id end |
#deliver ⇒ Object
Deliver the task to the processing endpoint.
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 202 def deliver Cloudtasker.logger.info(('Processing task...')) # Send request resp = http_client.request(request_content) # Delete task if successful if resp.code.to_s =~ /20\d/ destroy Cloudtasker.logger.info(('Task handled successfully')) else retry_later(RETRY_INTERVAL) Cloudtasker.logger.info(("Task failure - Retry in #{RETRY_INTERVAL} seconds...")) end resp rescue Net::ReadTimeout retry_later(RETRY_INTERVAL) Cloudtasker.logger.info( ( "Task deadline exceeded (#{dispatch_deadline}s) - Retry in #{RETRY_INTERVAL} seconds..." ) ) end |
#destroy ⇒ Object
Remove the task from the queue.
195 196 197 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 195 def destroy self.class.delete(id) end |
#gid ⇒ <Type>
Return the namespaced task id
167 168 169 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 167 def gid self.class.key(id) end |
#redis ⇒ Class
Return the redis client.
142 143 144 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 142 def redis self.class.redis end |
#retry_later(interval, opts = {}) ⇒ Object
Retry the task later.
178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 178 def retry_later(interval, opts = {}) is_error = opts.to_h.fetch(:is_error, true) redis.write( gid, retries: is_error ? retries + 1 : retries, http_request: http_request, schedule_time: (Time.now + interval).to_i, queue: queue, dispatch_deadline: dispatch_deadline ) redis.sadd(self.class.key, [id]) end |
#to_h ⇒ Hash
Return a hash description of the task.
151 152 153 154 155 156 157 158 159 160 |
# File 'lib/cloudtasker/backend/redis_task.rb', line 151 def to_h { id: id, http_request: http_request, schedule_time: schedule_time.to_i, retries: retries, queue: queue, dispatch_deadline: dispatch_deadline } end |