Class: Cloudtasker::Backend::MemoryTask

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/backend/memory_task.rb

Overview

Manage local tasks pushed to memory. Used for testing.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id:, http_request:, schedule_time: nil, queue: nil, job_retries: 0, **_xargs) ⇒ MemoryTask

Build a new instance of the class.

Parameters:

  • id (String)

    The ID of the task.

  • http_request (Hash)

    The HTTP request content.

  • schedule_time (Integer) (defaults to: nil)

    When to run the task (Unix timestamp)



120
121
122
123
124
125
126
# File 'lib/cloudtasker/backend/memory_task.rb', line 120

def initialize(id:, http_request:, schedule_time: nil, queue: nil, job_retries: 0, **_xargs)
  @id = id
  @http_request = http_request
  @schedule_time = Time.at(schedule_time || 0)
  @queue = queue
  @job_retries = job_retries || 0
end

Instance Attribute Details

#http_requestObject (readonly)

Returns the value of attribute http_request.



9
10
11
# File 'lib/cloudtasker/backend/memory_task.rb', line 9

def http_request
  @http_request
end

#idObject (readonly)

Returns the value of attribute id.



9
10
11
# File 'lib/cloudtasker/backend/memory_task.rb', line 9

def id
  @id
end

#job_retriesObject

Returns the value of attribute job_retries.



8
9
10
# File 'lib/cloudtasker/backend/memory_task.rb', line 8

def job_retries
  @job_retries
end

#queueObject (readonly)

Returns the value of attribute queue.



9
10
11
# File 'lib/cloudtasker/backend/memory_task.rb', line 9

def queue
  @queue
end

#schedule_timeObject (readonly)

Returns the value of attribute schedule_time.



9
10
11
# File 'lib/cloudtasker/backend/memory_task.rb', line 9

def schedule_time
  @schedule_time
end

Class Method Details

.all(worker_class_name = nil) ⇒ Array<Cloudtasker::Backend::MemoryTask>

Return all enqueued tasks. A worker class name can be specified to filter the returned results.

Parameters:

  • worker_class_name (String) (defaults to: nil)

    Filter tasks on worker class name.

Returns:



49
50
51
52
53
54
55
56
57
# File 'lib/cloudtasker/backend/memory_task.rb', line 49

def self.all(worker_class_name = nil)
  # Always return a copy of the queue so that tasks can safely be removed
  # (via #execute) without tampering with the enumerator
  if worker_class_name
    queue.select { |e| e.worker_class_name == worker_class_name }
  else
    queue.dup
  end
end

.clear(worker_class_name = nil) ⇒ Array<Cloudtasker::Backend::MemoryTask>

Clear the queue.

Parameters:

  • worker_class_name (String) (defaults to: nil)

    Filter jobs on worker class name.

Returns:



105
106
107
108
109
110
111
# File 'lib/cloudtasker/backend/memory_task.rb', line 105

def self.clear(worker_class_name = nil)
  if worker_class_name
    queue.reject! { |e| e.worker_class_name == worker_class_name }
  else
    queue.clear
  end
end

.create(payload) ⇒ Object

Push a job to the queue.

Parameters:

  • payload (Hash)

    The Cloud Task payload.



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/cloudtasker/backend/memory_task.rb', line 64

def self.create(payload)
  id = payload[:id] || SecureRandom.uuid
  payload = payload.merge(schedule_time: payload[:schedule_time].to_i)

  # Save task
  task = new(**payload.merge(id: id))
  queue << task

  # Execute task immediately if in testing and inline mode enabled
  task.execute if inline_mode?

  task
end

.delete(id) ⇒ Object

Delete a task by id.

Parameters:

  • id (String)

    The task id.



94
95
96
# File 'lib/cloudtasker/backend/memory_task.rb', line 94

def self.delete(id)
  queue.reject! { |e| e.id == id }
end

.drain(worker_class_name = nil) ⇒ Array<any>

Run all Tasks in the queue. Optionally filter which tasks to run based on the worker class name.

Parameters:

  • worker_class_name (String) (defaults to: nil)

    Run tasks for a specific worker class name.

Returns:

  • (Array<any>)

    The return values of the workers perform method.



37
38
39
# File 'lib/cloudtasker/backend/memory_task.rb', line 37

def self.drain(worker_class_name = nil)
  all(worker_class_name).map(&:execute)
end

.find(id) ⇒ Cloudtasker::Backend::MemoryTask?

Get a task by id.

Parameters:

  • id (String)

    The id of the task.

Returns:



85
86
87
# File 'lib/cloudtasker/backend/memory_task.rb', line 85

def self.find(id)
  queue.find { |e| e.id == id }
end

.inline_mode?Boolean

Return true if we are in test inline execution mode.

Returns:

  • (Boolean)

    True if inline mode enabled.



16
17
18
# File 'lib/cloudtasker/backend/memory_task.rb', line 16

def self.inline_mode?
  defined?(Cloudtasker::Testing) && Cloudtasker::Testing.inline?
end

.queueArray<Hash>

Return the task queue. A worker class name

Returns:

  • (Array<Hash>)

    <description>



25
26
27
# File 'lib/cloudtasker/backend/memory_task.rb', line 25

def self.queue
  @queue ||= []
end

Instance Method Details

#==(other) ⇒ Boolean

Equality operator.

Parameters:

  • other (Any)

    The object to compare.

Returns:

  • (Boolean)

    True if the object is equal.



188
189
190
# File 'lib/cloudtasker/backend/memory_task.rb', line 188

def ==(other)
  other.is_a?(self.class) && other.id == id
end

#executeAny

Execute the task.

Returns:

  • (Any)

    The return value of the worker perform method.



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/cloudtasker/backend/memory_task.rb', line 165

def execute
  # Execute worker
  worker_payload = payload.merge(job_retries: job_retries, task_id: id)
  resp = WorkerHandler.with_worker_handling(worker_payload, &:execute)

  # Delete task
  self.class.delete(id)
  resp
rescue DeadWorkerError => e
  self.class.delete(id)
  raise(e) if self.class.inline_mode?
rescue StandardError => e
  self.job_retries += 1
  raise(e) if self.class.inline_mode?
end

#payloadHash

Return task payload.

Returns:

  • (Hash)

    The task payload.



133
134
135
# File 'lib/cloudtasker/backend/memory_task.rb', line 133

def payload
  @payload ||= JSON.parse(http_request.dig(:body), symbolize_names: true)
end

#to_hHash

Return a hash description of the task.

Returns:

  • (Hash)

    A hash description of the task.



151
152
153
154
155
156
157
158
# File 'lib/cloudtasker/backend/memory_task.rb', line 151

def to_h
  {
    id: id,
    http_request: http_request,
    schedule_time: schedule_time.to_i,
    queue: queue
  }
end

#worker_class_nameString

Return the worker class from the task payload.

Returns:

  • (String)

    The task worker class name.



142
143
144
# File 'lib/cloudtasker/backend/memory_task.rb', line 142

def worker_class_name
  payload[:worker]
end