Class: Cloudtasker::WorkerHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/worker_handler.rb

Overview

Build, serialize and schedule tasks on the processing backend.

Constant Summary collapse

JWT_ALG =

Alrogith used to sign the verification token

'HS256'
REDIS_PAYLOAD_NAMESPACE =

Sub-namespace to use for redis keys when storing payloads in Redis

'payload'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker) ⇒ WorkerHandler

Prepare a new cloud task.

Parameters:



141
142
143
# File 'lib/cloudtasker/worker_handler.rb', line 141

def initialize(worker)
  @worker = worker
end

Instance Attribute Details

#workerObject (readonly)

Returns the value of attribute worker.



8
9
10
# File 'lib/cloudtasker/worker_handler.rb', line 8

def worker
  @worker
end

Class Method Details

.execute_from_payload!(input_payload) ⇒ Any

Execute a task worker from a task payload

Parameters:

  • input_payload (Hash)

    The Cloud Task payload.

Returns:

  • (Any)

    The return value of the worker perform method.



71
72
73
# File 'lib/cloudtasker/worker_handler.rb', line 71

def self.execute_from_payload!(input_payload)
  with_worker_handling(input_payload, &:execute)
end

.extract_payload(input_payload) ⇒ Hash

Return the argument payload key (if present) along with the actual worker payload.

If the payload was stored in Redis then retrieve it.

Returns:

  • (Hash)

    Hash



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/cloudtasker/worker_handler.rb', line 120

def self.extract_payload(input_payload)
  # Get references
  payload = JSON.parse(input_payload.to_json, symbolize_names: true)
  args_payload_id = payload.delete(:job_args_payload_id)
  args_payload_key = args_payload_id ? key([REDIS_PAYLOAD_NAMESPACE, args_payload_id].join('/')) : nil

  # Retrieve the actual worker args payload
  args_payload = args_payload_key ? redis.fetch(args_payload_key) : payload[:job_args]

  # Return the payload
  {
    args_payload_key: args_payload_key,
    payload: payload.merge(job_args: args_payload)
  }
end

.key(val) ⇒ String

Return a namespaced key

Parameters:

  • val (String, Symbol)

    The key to namespace

Returns:

  • (String)

    The namespaced key.



24
25
26
27
28
# File 'lib/cloudtasker/worker_handler.rb', line 24

def self.key(val)
  return nil if val.nil?

  [to_s.underscore, val.to_s].join('/')
end

.log_execution_error(worker, error) ⇒ Object

Log error on execution failure.

Parameters:



50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/cloudtasker/worker_handler.rb', line 50

def self.log_execution_error(worker, error)
  # ActiveJob has its own error logging. No need to double log the error.
  # Note: we use string matching instead of class matching as
  # ActiveJob::QueueAdapters::CloudtaskerAdapter::JobWrapper might not be loaded
  return if worker.class.to_s =~ /^ActiveJob::/

  # Choose logger to use based on context
  # Worker will be nil on InvalidWorkerError - in that case we use generic logging
  logger = worker&.logger || Cloudtasker.logger

  # Log error
  logger.error(error)
end

.redisCloudtasker::RedisClient

Return the cloudtasker redis client

Returns:



35
36
37
38
39
40
# File 'lib/cloudtasker/worker_handler.rb', line 35

def self.redis
  @redis ||= begin
    require 'cloudtasker/redis_client'
    RedisClient.new
  end
end

.with_worker_handling(input_payload) {|Hash| ... } ⇒ Any

Local middleware used to retrieve the job arg payload from cache if a arg payload reference is present.

Parameters:

  • payload (Hash)

    The full job payload

Yields:

  • (Hash)

    The actual payload to use to process the job.

Returns:

  • (Any)

    The block result



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/cloudtasker/worker_handler.rb', line 85

def self.with_worker_handling(input_payload)
  # Extract payload information
  extracted_payload = extract_payload(input_payload)
  payload = extracted_payload[:payload]
  args_payload_key = extracted_payload[:args_payload_key]

  # Build worker
  worker = Cloudtasker::Worker.from_hash(payload) || raise(InvalidWorkerError)

  # Yied worker
  resp = yield(worker)

  # Delete stored args payload if job has completed
  redis.del(args_payload_key) if args_payload_key && !worker.job_reenqueued

  resp
rescue DeadWorkerError => e
  # Delete stored args payload if job is dead
  redis.del(args_payload_key) if args_payload_key
  log_execution_error(worker, e)
  Cloudtasker.config.on_dead.call(e, worker)
  raise(e)
rescue StandardError => e
  log_execution_error(worker, e)
  Cloudtasker.config.on_error.call(e, worker)
  raise(e)
end

Instance Method Details

#schedule(time_at: nil) ⇒ Cloudtasker::CloudTask

Schedule the task on GCP Cloud Task.

Parameters:

  • time_at (Integer, nil) (defaults to: nil)

    A unix timestamp specifying when to run the job. Leave to ‘nil` to run now.

Returns:



236
237
238
239
240
241
242
# File 'lib/cloudtasker/worker_handler.rb', line 236

def schedule(time_at: nil)
  # Generate task payload
  task = task_payload.merge(schedule_time: time_at).compact

  # Create and return remote task
  CloudTask.create(task)
end

#store_payload_in_redis?Boolean

Return true if the worker args must be stored in Redis.

Returns:

  • (Boolean)

    True if the payload must be stored in redis.



171
172
173
174
# File 'lib/cloudtasker/worker_handler.rb', line 171

def store_payload_in_redis?
  Cloudtasker.config.redis_payload_storage_threshold &&
    worker.job_args.to_json.bytesize > (Cloudtasker.config.redis_payload_storage_threshold * 1024)
end

#task_payloadHash

Return the full task configuration sent to Cloud Task

Returns:

  • (Hash)

    The task body



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/cloudtasker/worker_handler.rb', line 150

def task_payload
  {
    http_request: {
      http_method: 'POST',
      url: Cloudtasker.config.processor_url,
      headers: {
        Cloudtasker::Config::CONTENT_TYPE_HEADER => 'application/json',
        Cloudtasker::Config::AUTHORIZATION_HEADER => "Bearer #{Authenticator.verification_token}"
      },
      body: worker_payload.to_json
    },
    dispatch_deadline: worker.dispatch_deadline.to_i,
    queue: worker.job_queue
  }
end

#worker_args_payloadHash

Return the payload to use for job arguments. This payload is merged inside the #worker_payload.

If the argument payload must be stored in Redis then returns: ‘{ job_args_payload_id: <worker_id> }`

If the argument payload must be natively handled by the backend then returns: ‘{ job_args: […] }`

Returns:

  • (Hash)

    The worker args payload.



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/cloudtasker/worker_handler.rb', line 189

def worker_args_payload
  @worker_args_payload ||= begin
    if store_payload_in_redis?
      # Store payload in Redis
      self.class.redis.write(
        self.class.key([REDIS_PAYLOAD_NAMESPACE, worker.job_id].join('/')),
        worker.job_args
      )

      # Return reference to args payload
      { job_args_payload_id: worker.job_id }
    else
      # Return regular job args payload
      { job_args: worker.job_args }
    end
  end
end

#worker_payloadHash

Return the task payload that Google Task will eventually send to the job processor.

The payload includes the worker name and the arguments to pass to the worker.

The worker arguments should use primitive types as much as possible as all arguments will be serialized to JSON.

Returns:

  • (Hash)

    The job payload



219
220
221
222
223
224
225
226
# File 'lib/cloudtasker/worker_handler.rb', line 219

def worker_payload
  @worker_payload ||= {
    worker: worker.job_class_name,
    job_queue: worker.job_queue,
    job_id: worker.job_id,
    job_meta: worker.job_meta.to_h
  }.merge(worker_args_payload)
end