Class: Cloudtasker::Backend::GoogleCloudTaskV2

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/backend/google_cloud_task_v2.rb

Overview

Manage tasks pushed to GCP Cloud Task

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(gcp_task) ⇒ GoogleCloudTaskV2

Build a new instance of the class.

Parameters:

  • resp (Google::Cloud::Tasks::V2::Task)

    The GCP Cloud Task response



194
195
196
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 194

def initialize(gcp_task)
  @gcp_task = gcp_task
end

Instance Attribute Details

#gcp_taskObject

Returns the value of attribute gcp_task.



12
13
14
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 12

def gcp_task
  @gcp_task
end

Class Method Details

.clientGoogle::Cloud::Tasks::V2::CloudTasks::Client

Return the Google Cloud Task client.

Returns:

  • (Google::Cloud::Tasks::V2::CloudTasks::Client)

    The Google Cloud Task client.



51
52
53
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 51

def self.client
  @client ||= ::Google::Cloud::Tasks.cloud_tasks
end

.configCloudtasker::Config

Return the cloudtasker configuration. See Cloudtasker#configure.

Returns:



60
61
62
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 60

def self.config
  Cloudtasker.config
end

.create(payload) ⇒ Cloudtasker::Backend::GoogleCloudTaskV2?

Create a new task.

Parameters:

  • payload (Hash)

    The task payload.

Returns:



156
157
158
159
160
161
162
163
164
165
166
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 156

def self.create(payload)
  payload = format_task_payload(payload)

  # Infer full queue name
  relative_queue = payload.delete(:queue)
  full_queue = queue_path(relative_queue)

  # Create task
  resp = with_gapi_retries { client.create_task(parent: full_queue, task: payload) }
  resp ? new(resp) : nil
end

.delete(id) ⇒ Object

Delete a task by id.

Parameters:

  • id (String)

    The id of the task.



173
174
175
176
177
178
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 173

def self.delete(id)
  with_gapi_retries { client.delete_task(name: id) }
rescue Google::Cloud::NotFoundError, Google::Cloud::PermissionDeniedError
  # The ID does not exist
  nil
end

.find(id) ⇒ Cloudtasker::Backend::GoogleCloudTaskV2?

Find a task by id.

Parameters:

  • id (String)

    The task id.

Returns:



141
142
143
144
145
146
147
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 141

def self.find(id)
  resp = with_gapi_retries { client.get_task(name: id) }
  resp ? new(resp) : nil
rescue Google::Cloud::NotFoundError
  # The ID does not exist
  nil
end

.format_protobuf_duration(duration) ⇒ Google::Protobuf::Timestamp?

Return a protobuf duration.

Parameters:

  • duration (Integer, nil)

    A duration in seconds.

Returns:

  • (Google::Protobuf::Timestamp, nil)

    The protobuff timestamp



101
102
103
104
105
106
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 101

def self.format_protobuf_duration(duration)
  return nil unless duration

  # Generate protobuf timestamp
  Google::Protobuf::Duration.new.tap { |e| e.seconds = duration.to_i }
end

.format_protobuf_time(schedule_time) ⇒ Google::Protobuf::Timestamp?

Return a protobuf timestamp specifying how to wait before running a task.

Parameters:

  • schedule_time (Integer, nil)

    A unix timestamp.

Returns:

  • (Google::Protobuf::Timestamp, nil)

    The protobuff timestamp



87
88
89
90
91
92
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 87

def self.format_protobuf_time(schedule_time)
  return nil unless schedule_time

  # Generate protobuf timestamp
  Google::Protobuf::Timestamp.new.tap { |e| e.seconds = schedule_time.to_i }
end

.format_task_payload(payload) ⇒ Hash

Format the job payload sent to Cloud Tasks.

Parameters:

  • hash (Hash)

    The worker payload.

Returns:

  • (Hash)

    The Cloud Task payloadd.



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 115

def self.format_task_payload(payload)
  payload = JSON.parse(payload.to_json, symbolize_names: true) # deep dup

  # Format schedule time to Google::Protobuf::Timestamp
  payload[:schedule_time] = format_protobuf_time(payload[:schedule_time])

  # Format dispatch_deadline to Google::Protobuf::Duration
  payload[:dispatch_deadline] = format_protobuf_duration(payload[:dispatch_deadline])

  # Encode job content to support UTF-8.
  # Google Cloud Task expect content to be ASCII-8BIT compatible (binary)
  payload[:http_request][:headers] ||= {}
  payload[:http_request][:headers][Cloudtasker::Config::CONTENT_TYPE_HEADER] = 'text/json'
  payload[:http_request][:headers][Cloudtasker::Config::ENCODING_HEADER] = 'Base64'
  payload[:http_request][:body] = Base64.encode64(payload[:http_request][:body])

  payload.compact
end

.queue_path(queue_name) ⇒ String

Return the fully qualified path for the Cloud Task queue.

Parameters:

  • queue_name (String)

    The relative name of the queue.

Returns:

  • (String)

    The queue path.



71
72
73
74
75
76
77
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 71

def self.queue_path(queue_name)
  client.queue_path(
    project: config.gcp_project_id,
    location: config.gcp_location_id,
    queue: [config.gcp_queue_prefix, queue_name].map(&:presence).compact.join('-')
  )
end

.setup_queue(name: nil, concurrency: nil, retries: nil) ⇒ Google::Cloud::Tasks::V2::Queue

Create the queue configured in Cloudtasker if it does not already exist.

Parameters:

  • :name (String)

    The queue name

  • :concurrency (Integer)

    The queue concurrency

  • :retries (Integer)

    The number of retries for the queue

Returns:

  • (Google::Cloud::Tasks::V2::Queue)

    The queue



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 23

def self.setup_queue(name: nil, concurrency: nil, retries: nil)
  # Build full queue path
  queue_name = name || Cloudtasker::Config::DEFAULT_JOB_QUEUE
  full_queue_name = queue_path(queue_name)

  # Try to get existing queue
  client.get_queue(name: full_queue_name)
rescue Google::Cloud::NotFoundError
  # Extract options
  queue_concurrency = (concurrency || Cloudtasker::Config::DEFAULT_QUEUE_CONCURRENCY).to_i
  queue_retries = (retries || Cloudtasker::Config::DEFAULT_QUEUE_RETRIES).to_i

  # Create queue on 'not found' error
  client.create_queue(
    parent: client.location_path(project: config.gcp_project_id, location: config.gcp_location_id),
    queue: {
      name: full_queue_name,
      retry_config: { max_attempts: queue_retries },
      rate_limits: { max_concurrent_dispatches: queue_concurrency }
    }
  )
end

.with_gapi_retriesObject

Helper method encapsulating the retry strategy for Google API calls



183
184
185
186
187
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 183

def self.with_gapi_retries
  Retriable.retriable(on: [Google::Cloud::UnavailableError], tries: 3) do
    yield
  end
end

Instance Method Details

#relative_queueString

Return the relative queue (queue name minus prefix) the task is in.

Returns:

  • (String)

    The relative queue name



203
204
205
206
207
208
209
210
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 203

def relative_queue
  gcp_task
    .name
    .match(%r{/queues/([^/]+)})
    &.captures
    &.first
    &.sub(/^#{self.class.config.gcp_queue_prefix}-/, '')
end

#to_hHash

Return a hash description of the task.

Returns:

  • (Hash)

    A hash description of the task.



217
218
219
220
221
222
223
224
225
226
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 217

def to_h
  {
    id: gcp_task.name,
    http_request: gcp_task.to_h[:http_request],
    schedule_time: gcp_task.to_h.dig(:schedule_time, :seconds).to_i,
    dispatch_deadline: gcp_task.to_h.dig(:dispatch_deadline, :seconds).to_i,
    retries: gcp_task.to_h[:response_count],
    queue: relative_queue
  }
end