Class: Cloudtasker::Backend::GoogleCloudTaskV1

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/backend/google_cloud_task_v1.rb

Overview

Manage tasks pushed to GCP Cloud Task

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(gcp_task) ⇒ GoogleCloudTaskV1

Build a new instance of the class.

Parameters:

  • resp (Google::Cloud::Tasks::V2beta3::Task)

    The GCP Cloud Task response



191
192
193
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 191

def initialize(gcp_task)
  @gcp_task = gcp_task
end

Instance Attribute Details

#gcp_taskObject

Returns the value of attribute gcp_task.



10
11
12
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 10

def gcp_task
  @gcp_task
end

Class Method Details

.clientGoogle::Cloud::Tasks

Return the Google Cloud Task client.

Returns:

  • (Google::Cloud::Tasks)

    The Google Cloud Task client.



49
50
51
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 49

def self.client
  @client ||= ::Google::Cloud::Tasks.new(version: :v2beta3)
end

.configCloudtasker::Config

Return the cloudtasker configuration. See Cloudtasker#configure.

Returns:



58
59
60
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 58

def self.config
  Cloudtasker.config
end

.create(payload) ⇒ Cloudtasker::Backend::GoogleCloudTaskV1?

Create a new task.

Parameters:

  • payload (Hash)

    The task payload.

Returns:



154
155
156
157
158
159
160
161
162
163
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 154

def self.create(payload)
  payload = format_task_payload(payload)

  # Extract relative queue name
  relative_queue = payload.delete(:queue)

  # Create task
  resp = with_gax_retries { client.create_task(queue_path(relative_queue), payload) }
  resp ? new(resp) : nil
end

.delete(id) ⇒ Object

Delete a task by id.

Parameters:

  • id (String)

    The id of the task.



170
171
172
173
174
175
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 170

def self.delete(id)
  with_gax_retries { client.delete_task(id) }
rescue Google::Gax::RetryError, Google::Gax::NotFoundError, GRPC::NotFound, Google::Gax::PermissionDeniedError
  # The ID does not exist
  nil
end

.find(id) ⇒ Cloudtasker::Backend::GoogleCloudTaskV1?

Find a task by id.

Parameters:

  • id (String)

    The task id.

Returns:



139
140
141
142
143
144
145
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 139

def self.find(id)
  resp = with_gax_retries { client.get_task(id) }
  resp ? new(resp) : nil
rescue Google::Gax::RetryError, Google::Gax::NotFoundError, GRPC::NotFound
  # The ID does not exist
  nil
end

.format_protobuf_duration(duration) ⇒ Google::Protobuf::Timestamp?

Return a protobuf duration.

Parameters:

  • duration (Integer, nil)

    A duration in seconds.

Returns:

  • (Google::Protobuf::Timestamp, nil)

    The protobuff timestamp



99
100
101
102
103
104
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 99

def self.format_protobuf_duration(duration)
  return nil unless duration

  # Generate protobuf timestamp
  Google::Protobuf::Duration.new.tap { |e| e.seconds = duration.to_i }
end

.format_protobuf_time(schedule_time) ⇒ Google::Protobuf::Timestamp?

Return a protobuf timestamp specifying how to wait before running a task.

Parameters:

  • schedule_time (Integer, nil)

    A unix timestamp.

Returns:

  • (Google::Protobuf::Timestamp, nil)

    The protobuff timestamp



85
86
87
88
89
90
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 85

def self.format_protobuf_time(schedule_time)
  return nil unless schedule_time

  # Generate protobuf timestamp
  Google::Protobuf::Timestamp.new.tap { |e| e.seconds = schedule_time.to_i }
end

.format_task_payload(payload) ⇒ Hash

Format the job payload sent to Cloud Tasks.

Parameters:

  • hash (Hash)

    The worker payload.

Returns:

  • (Hash)

    The Cloud Task payloadd.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 113

def self.format_task_payload(payload)
  payload = JSON.parse(payload.to_json, symbolize_names: true) # deep dup

  # Format schedule time to Google::Protobuf::Timestamp
  payload[:schedule_time] = format_protobuf_time(payload[:schedule_time])

  # Format dispatch_deadline to Google::Protobuf::Duration
  payload[:dispatch_deadline] = format_protobuf_duration(payload[:dispatch_deadline])

  # Encode job content to support UTF-8. Google Cloud Task
  # expect content to be ASCII-8BIT compatible (binary)
  payload[:http_request][:headers] ||= {}
  payload[:http_request][:headers][Cloudtasker::Config::CONTENT_TYPE_HEADER] = 'text/json'
  payload[:http_request][:headers][Cloudtasker::Config::ENCODING_HEADER] = 'Base64'
  payload[:http_request][:body] = Base64.encode64(payload[:http_request][:body])

  payload.compact
end

.queue_path(queue_name) ⇒ String

Return the fully qualified path for the Cloud Task queue.

Parameters:

  • queue_name (String)

    The relative name of the queue.

Returns:

  • (String)

    The queue path.



69
70
71
72
73
74
75
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 69

def self.queue_path(queue_name)
  client.queue_path(
    config.gcp_project_id,
    config.gcp_location_id,
    [config.gcp_queue_prefix, queue_name].map(&:presence).compact.join('-')
  )
end

.setup_queue(name: nil, concurrency: nil, retries: nil) ⇒ Google::Cloud::Tasks::V2beta3::Queue

Create the queue configured in Cloudtasker if it does not already exist.

Parameters:

  • :name (String)

    The queue name

  • :concurrency (Integer)

    The queue concurrency

  • :retries (Integer)

    The number of retries for the queue

Returns:

  • (Google::Cloud::Tasks::V2beta3::Queue)

    The queue



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 21

def self.setup_queue(name: nil, concurrency: nil, retries: nil)
  # Build full queue path
  queue_name = name || Cloudtasker::Config::DEFAULT_JOB_QUEUE
  full_queue_name = queue_path(queue_name)

  # Try to get existing queue
  client.get_queue(full_queue_name)
rescue Google::Gax::RetryError
  # Extract options
  queue_concurrency = (concurrency || Cloudtasker::Config::DEFAULT_QUEUE_CONCURRENCY).to_i
  queue_retries = (retries || Cloudtasker::Config::DEFAULT_QUEUE_RETRIES).to_i

  # Create queue on 'not found' error
  client.create_queue(
    client.location_path(config.gcp_project_id, config.gcp_location_id),
    {
      name: full_queue_name,
      retry_config: { max_attempts: queue_retries },
      rate_limits: { max_concurrent_dispatches: queue_concurrency }
    }
  )
end

.with_gax_retriesObject

Helper method encapsulating the retry strategy for GAX calls



180
181
182
183
184
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 180

def self.with_gax_retries
  Retriable.retriable(on: [Google::Gax::UnavailableError], tries: 3) do
    yield
  end
end

Instance Method Details

#relative_queueString

Return the relative queue (queue name minus prefix) the task is in.

Returns:

  • (String)

    The relative queue name



200
201
202
203
204
205
206
207
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 200

def relative_queue
  gcp_task
    .name
    .match(%r{/queues/([^/]+)})
    &.captures
    &.first
    &.sub(/^#{self.class.config.gcp_queue_prefix}-/, '')
end

#to_hHash

Return a hash description of the task.

Returns:

  • (Hash)

    A hash description of the task.



214
215
216
217
218
219
220
221
222
223
# File 'lib/cloudtasker/backend/google_cloud_task_v1.rb', line 214

def to_h
  {
    id: gcp_task.name,
    http_request: gcp_task.to_h[:http_request],
    schedule_time: gcp_task.to_h.dig(:schedule_time, :seconds).to_i,
    dispatch_deadline: gcp_task.to_h.dig(:dispatch_deadline, :seconds).to_i,
    retries: gcp_task.to_h[:response_count],
    queue: relative_queue
  }
end