Class: Cloudtasker::WorkerHandler
- Inherits:
-
Object
- Object
- Cloudtasker::WorkerHandler
- Defined in:
- lib/cloudtasker/worker_handler.rb
Overview
Build, serialize and schedule tasks on the processing backend.
Constant Summary collapse
- JWT_ALG =
Alrogith used to sign the verification token
'HS256'
- REDIS_PAYLOAD_NAMESPACE =
Sub-namespace to use for redis keys when storing payloads in Redis
'payload'
Instance Attribute Summary collapse
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Class Method Summary collapse
-
.execute_from_payload!(input_payload) ⇒ Any
Execute a task worker from a task payload.
-
.extract_payload(input_payload) ⇒ Hash
Return the argument payload key (if present) along with the actual worker payload.
-
.key(val) ⇒ String
Return a namespaced key.
-
.log_execution_error(worker, error) ⇒ Object
Log error on execution failure.
-
.redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client.
-
.with_worker_handling(input_payload) {|Hash| ... } ⇒ Any
Local middleware used to retrieve the job arg payload from cache if a arg payload reference is present.
Instance Method Summary collapse
-
#initialize(worker) ⇒ WorkerHandler
constructor
Prepare a new cloud task.
-
#schedule(time_at: nil) ⇒ Cloudtasker::CloudTask
Schedule the task on GCP Cloud Task.
-
#store_payload_in_redis? ⇒ Boolean
Return true if the worker args must be stored in Redis.
-
#task_payload ⇒ Hash
Return the full task configuration sent to Cloud Task.
-
#worker_args_payload ⇒ Hash
Return the payload to use for job arguments.
-
#worker_payload ⇒ Hash
Return the task payload that Google Task will eventually send to the job processor.
Constructor Details
#initialize(worker) ⇒ WorkerHandler
Prepare a new cloud task.
141 142 143 |
# File 'lib/cloudtasker/worker_handler.rb', line 141 def initialize(worker) @worker = worker end |
Instance Attribute Details
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
8 9 10 |
# File 'lib/cloudtasker/worker_handler.rb', line 8 def worker @worker end |
Class Method Details
.execute_from_payload!(input_payload) ⇒ Any
Execute a task worker from a task payload
71 72 73 |
# File 'lib/cloudtasker/worker_handler.rb', line 71 def self.execute_from_payload!(input_payload) with_worker_handling(input_payload, &:execute) end |
.extract_payload(input_payload) ⇒ Hash
Return the argument payload key (if present) along with the actual worker payload.
If the payload was stored in Redis then retrieve it.
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/cloudtasker/worker_handler.rb', line 120 def self.extract_payload(input_payload) # Get references payload = JSON.parse(input_payload.to_json, symbolize_names: true) args_payload_id = payload.delete(:job_args_payload_id) args_payload_key = args_payload_id ? key([REDIS_PAYLOAD_NAMESPACE, args_payload_id].join('/')) : nil # Retrieve the actual worker args payload args_payload = args_payload_key ? redis.fetch(args_payload_key) : payload[:job_args] # Return the payload { args_payload_key: args_payload_key, payload: payload.merge(job_args: args_payload) } end |
.key(val) ⇒ String
Return a namespaced key
24 25 26 27 28 |
# File 'lib/cloudtasker/worker_handler.rb', line 24 def self.key(val) return nil if val.nil? [to_s.underscore, val.to_s].join('/') end |
.log_execution_error(worker, error) ⇒ Object
Log error on execution failure.
50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/cloudtasker/worker_handler.rb', line 50 def self.log_execution_error(worker, error) # ActiveJob has its own error logging. No need to double log the error. # Note: we use string matching instead of class matching as # ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper might not be loaded return if worker.class.to_s =~ /^ActiveJob::/ # Choose logger to use based on context # Worker will be nil on InvalidWorkerError - in that case we use generic logging logger = worker&.logger || Cloudtasker.logger # Log error logger.error(error) end |
.redis ⇒ Cloudtasker::RedisClient
Return the cloudtasker redis client
35 36 37 38 39 40 |
# File 'lib/cloudtasker/worker_handler.rb', line 35 def self.redis @redis ||= begin require 'cloudtasker/redis_client' RedisClient.new end end |
.with_worker_handling(input_payload) {|Hash| ... } ⇒ Any
Local middleware used to retrieve the job arg payload from cache if a arg payload reference is present.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/cloudtasker/worker_handler.rb', line 85 def self.with_worker_handling(input_payload) # Extract payload information extracted_payload = extract_payload(input_payload) payload = extracted_payload[:payload] args_payload_key = extracted_payload[:args_payload_key] # Build worker worker = Cloudtasker::Worker.from_hash(payload) || raise(InvalidWorkerError) # Yied worker resp = yield(worker) # Delete stored args payload if job has completed redis.del(args_payload_key) if args_payload_key && !worker.job_reenqueued resp rescue DeadWorkerError => e # Delete stored args payload if job is dead redis.del(args_payload_key) if args_payload_key log_execution_error(worker, e) Cloudtasker.config.on_dead.call(e, worker) raise(e) rescue StandardError => e log_execution_error(worker, e) Cloudtasker.config.on_error.call(e, worker) raise(e) end |
Instance Method Details
#schedule(time_at: nil) ⇒ Cloudtasker::CloudTask
Schedule the task on GCP Cloud Task.
236 237 238 239 240 241 242 |
# File 'lib/cloudtasker/worker_handler.rb', line 236 def schedule(time_at: nil) # Generate task payload task = task_payload.merge(schedule_time: time_at).compact # Create and return remote task CloudTask.create(task) end |
#store_payload_in_redis? ⇒ Boolean
Return true if the worker args must be stored in Redis.
171 172 173 174 |
# File 'lib/cloudtasker/worker_handler.rb', line 171 def store_payload_in_redis? Cloudtasker.config.redis_payload_storage_threshold && worker.job_args.to_json.bytesize > (Cloudtasker.config.redis_payload_storage_threshold * 1024) end |
#task_payload ⇒ Hash
Return the full task configuration sent to Cloud Task
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/cloudtasker/worker_handler.rb', line 150 def task_payload { http_request: { http_method: 'POST', url: Cloudtasker.config.processor_url, headers: { Cloudtasker::Config::CONTENT_TYPE_HEADER => 'application/json', Cloudtasker::Config::AUTHORIZATION_HEADER => "Bearer #{Authenticator.verification_token}" }, body: worker_payload.to_json }, dispatch_deadline: worker.dispatch_deadline.to_i, queue: worker.job_queue } end |
#worker_args_payload ⇒ Hash
Return the payload to use for job arguments. This payload is merged inside the #worker_payload.
If the argument payload must be stored in Redis then returns: ‘{ job_args_payload_id: <worker_id> }`
If the argument payload must be natively handled by the backend then returns: ‘{ job_args: […] }`
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/cloudtasker/worker_handler.rb', line 189 def worker_args_payload @worker_args_payload ||= begin if store_payload_in_redis? # Store payload in Redis self.class.redis.write( self.class.key([REDIS_PAYLOAD_NAMESPACE, worker.job_id].join('/')), worker.job_args ) # Return reference to args payload { job_args_payload_id: worker.job_id } else # Return regular job args payload { job_args: worker.job_args } end end end |
#worker_payload ⇒ Hash
Return the task payload that Google Task will eventually send to the job processor.
The payload includes the worker name and the arguments to pass to the worker.
The worker arguments should use primitive types as much as possible as all arguments will be serialized to JSON.
219 220 221 222 223 224 225 226 |
# File 'lib/cloudtasker/worker_handler.rb', line 219 def worker_payload @worker_payload ||= { worker: worker.job_class_name, job_queue: worker.job_queue, job_id: worker.job_id, job_meta: worker..to_h }.merge(worker_args_payload) end |