Class: Cloudtasker::Backend::GoogleCloudTaskV2
- Inherits:
-
Object
- Object
- Cloudtasker::Backend::GoogleCloudTaskV2
- Defined in:
- lib/cloudtasker/backend/google_cloud_task_v2.rb
Overview
Manage tasks pushed to GCP Cloud Task
Instance Attribute Summary collapse
-
#gcp_task ⇒ Object
Returns the value of attribute gcp_task.
Class Method Summary collapse
-
.client ⇒ Google::Cloud::Tasks::V2::CloudTasks::Client
Return the Google Cloud Task client.
-
.config ⇒ Cloudtasker::Config
Return the cloudtasker configuration.
-
.create(payload) ⇒ Cloudtasker::Backend::GoogleCloudTaskV2?
Create a new task.
-
.delete(id) ⇒ Object
Delete a task by id.
-
.find(id) ⇒ Cloudtasker::Backend::GoogleCloudTaskV2?
Find a task by id.
-
.format_protobuf_duration(duration) ⇒ Google::Protobuf::Timestamp?
Return a protobuf duration.
-
.format_protobuf_time(schedule_time) ⇒ Google::Protobuf::Timestamp?
Return a protobuf timestamp specifying how to wait before running a task.
-
.format_task_payload(payload) ⇒ Hash
Format the job payload sent to Cloud Tasks.
-
.queue_path(queue_name) ⇒ String
Return the fully qualified path for the Cloud Task queue.
-
.setup_queue(name: nil, concurrency: nil, retries: nil) ⇒ Google::Cloud::Tasks::V2::Queue
Create the queue configured in Cloudtasker if it does not already exist.
-
.with_gapi_retries ⇒ Object
Helper method encapsulating the retry strategy for Google API calls.
Instance Method Summary collapse
-
#initialize(gcp_task) ⇒ GoogleCloudTaskV2
constructor
Build a new instance of the class.
-
#relative_queue ⇒ String
Return the relative queue (queue name minus prefix) the task is in.
-
#to_h ⇒ Hash
Return a hash description of the task.
Constructor Details
#initialize(gcp_task) ⇒ GoogleCloudTaskV2
Build a new instance of the class.
194 195 196 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 194 def initialize(gcp_task) @gcp_task = gcp_task end |
Instance Attribute Details
#gcp_task ⇒ Object
Returns the value of attribute gcp_task.
12 13 14 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 12 def gcp_task @gcp_task end |
Class Method Details
.client ⇒ Google::Cloud::Tasks::V2::CloudTasks::Client
Return the Google Cloud Task client.
51 52 53 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 51 def self.client @client ||= ::Google::Cloud::Tasks.cloud_tasks end |
.config ⇒ Cloudtasker::Config
Return the cloudtasker configuration. See Cloudtasker#configure.
60 61 62 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 60 def self.config Cloudtasker.config end |
.create(payload) ⇒ Cloudtasker::Backend::GoogleCloudTaskV2?
Create a new task.
156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 156 def self.create(payload) payload = format_task_payload(payload) # Infer full queue name relative_queue = payload.delete(:queue) full_queue = queue_path(relative_queue) # Create task resp = with_gapi_retries { client.create_task(parent: full_queue, task: payload) } resp ? new(resp) : nil end |
.delete(id) ⇒ Object
Delete a task by id.
173 174 175 176 177 178 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 173 def self.delete(id) with_gapi_retries { client.delete_task(name: id) } rescue Google::Cloud::NotFoundError, Google::Cloud::PermissionDeniedError # The ID does not exist nil end |
.find(id) ⇒ Cloudtasker::Backend::GoogleCloudTaskV2?
Find a task by id.
141 142 143 144 145 146 147 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 141 def self.find(id) resp = with_gapi_retries { client.get_task(name: id) } resp ? new(resp) : nil rescue Google::Cloud::NotFoundError # The ID does not exist nil end |
.format_protobuf_duration(duration) ⇒ Google::Protobuf::Timestamp?
Return a protobuf duration.
101 102 103 104 105 106 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 101 def self.format_protobuf_duration(duration) return nil unless duration # Generate protobuf timestamp Google::Protobuf::Duration.new.tap { |e| e.seconds = duration.to_i } end |
.format_protobuf_time(schedule_time) ⇒ Google::Protobuf::Timestamp?
Return a protobuf timestamp specifying how to wait before running a task.
87 88 89 90 91 92 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 87 def self.format_protobuf_time(schedule_time) return nil unless schedule_time # Generate protobuf timestamp Google::Protobuf::Timestamp.new.tap { |e| e.seconds = schedule_time.to_i } end |
.format_task_payload(payload) ⇒ Hash
Format the job payload sent to Cloud Tasks.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 115 def self.format_task_payload(payload) payload = JSON.parse(payload.to_json, symbolize_names: true) # deep dup # Format schedule time to Google::Protobuf::Timestamp payload[:schedule_time] = format_protobuf_time(payload[:schedule_time]) # Format dispatch_deadline to Google::Protobuf::Duration payload[:dispatch_deadline] = format_protobuf_duration(payload[:dispatch_deadline]) # Encode job content to support UTF-8. # Google Cloud Task expect content to be ASCII-8BIT compatible (binary) payload[:http_request][:headers] ||= {} payload[:http_request][:headers][Cloudtasker::Config::CONTENT_TYPE_HEADER] = 'text/json' payload[:http_request][:headers][Cloudtasker::Config::ENCODING_HEADER] = 'Base64' payload[:http_request][:body] = Base64.encode64(payload[:http_request][:body]) payload.compact end |
.queue_path(queue_name) ⇒ String
Return the fully qualified path for the Cloud Task queue.
71 72 73 74 75 76 77 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 71 def self.queue_path(queue_name) client.queue_path( project: config.gcp_project_id, location: config.gcp_location_id, queue: [config.gcp_queue_prefix, queue_name].map(&:presence).compact.join('-') ) end |
.setup_queue(name: nil, concurrency: nil, retries: nil) ⇒ Google::Cloud::Tasks::V2::Queue
Create the queue configured in Cloudtasker if it does not already exist.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 23 def self.setup_queue(name: nil, concurrency: nil, retries: nil) # Build full queue path queue_name = name || Cloudtasker::Config::DEFAULT_JOB_QUEUE full_queue_name = queue_path(queue_name) # Try to get existing queue client.get_queue(name: full_queue_name) rescue Google::Cloud::NotFoundError # Extract options queue_concurrency = (concurrency || Cloudtasker::Config::DEFAULT_QUEUE_CONCURRENCY).to_i queue_retries = (retries || Cloudtasker::Config::DEFAULT_QUEUE_RETRIES).to_i # Create queue on 'not found' error client.create_queue( parent: client.location_path(project: config.gcp_project_id, location: config.gcp_location_id), queue: { name: full_queue_name, retry_config: { max_attempts: queue_retries }, rate_limits: { max_concurrent_dispatches: queue_concurrency } } ) end |
.with_gapi_retries ⇒ Object
Helper method encapsulating the retry strategy for Google API calls
183 184 185 186 187 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 183 def self.with_gapi_retries Retriable.retriable(on: [Google::Cloud::UnavailableError], tries: 3) do yield end end |
Instance Method Details
#relative_queue ⇒ String
Return the relative queue (queue name minus prefix) the task is in.
203 204 205 206 207 208 209 210 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 203 def relative_queue gcp_task .name .match(%r{/queues/([^/]+)}) &.captures &.first &.sub(/^#{self.class.config.gcp_queue_prefix}-/, '') end |
#to_h ⇒ Hash
Return a hash description of the task.
217 218 219 220 221 222 223 224 225 226 |
# File 'lib/cloudtasker/backend/google_cloud_task_v2.rb', line 217 def to_h { id: gcp_task.name, http_request: gcp_task.to_h[:http_request], schedule_time: gcp_task.to_h.dig(:schedule_time, :seconds).to_i, dispatch_deadline: gcp_task.to_h.dig(:dispatch_deadline, :seconds).to_i, retries: gcp_task.to_h[:response_count], queue: relative_queue } end |