Class: Taskinator::Task
- Inherits:
-
Object
- Object
- Taskinator::Task
- Includes:
- Comparable, Instrumentation, Persistence, Workflow
- Defined in:
- lib/taskinator/task.rb
Direct Known Subclasses
Defined Under Namespace
Classes: Job, Step, SubProcess
Instance Attribute Summary collapse
-
#created_at ⇒ Object
readonly
Returns the value of attribute created_at.
-
#next ⇒ Object
the next task in the sequence.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#process ⇒ Object
readonly
Returns the value of attribute process.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#updated_at ⇒ Object
readonly
Returns the value of attribute updated_at.
-
#uuid ⇒ Object
readonly
Returns the value of attribute uuid.
Class Method Summary collapse
- .define_job_task(process, job, args, options = {}) ⇒ Object
- .define_step_task(process, method, args, options = {}) ⇒ Object
- .define_sub_process_task(process, sub_process, options = {}) ⇒ Object
Instance Method Summary collapse
- #<=>(other) ⇒ Object
- #accept(visitor) ⇒ Object
- #cancel! ⇒ Object
- #cancelled? ⇒ Boolean
- #complete! ⇒ Object
-
#enqueue ⇒ Object
————————————————– subclasses must implement the following methods ————————————————–.
- #enqueue! ⇒ Object
- #fail!(error) ⇒ Object
- #incr_count? ⇒ Boolean
-
#initialize(process, options = {}) ⇒ Task
constructor
A new instance of Task.
-
#paused? ⇒ Boolean
helper method.
- #start ⇒ Object
- #start! ⇒ Object
- #to_s ⇒ Object
Methods included from Instrumentation
#cancelled_payload, #completed_payload, #enqueued_payload, #failed_payload, #instrument, #paused_payload, #processing_payload, #resumed_payload
Methods included from Persistence
add_process_to_list, deserialize, included, processes_list_key, serialize
Methods included from Workflow
#current_state, #current_state=, #transition
Constructor Details
#initialize(process, options = {}) ⇒ Task
Returns a new instance of Task.
33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/taskinator/task.rb', line 33 def initialize(process, ={}) raise ArgumentError, 'process' if process.nil? || !process.is_a?(Process) @uuid = "#{process.uuid}:task:#{Taskinator.generate_uuid}" @process = process @options = @queue = .delete(:queue) @created_at = Time.now.utc @updated_at = created_at @current_state = :initial end |
Instance Attribute Details
#created_at ⇒ Object (readonly)
Returns the value of attribute created_at.
27 28 29 |
# File 'lib/taskinator/task.rb', line 27 def created_at @created_at end |
#next ⇒ Object
the next task in the sequence
31 32 33 |
# File 'lib/taskinator/task.rb', line 31 def next @next end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
25 26 27 |
# File 'lib/taskinator/task.rb', line 25 def @options end |
#process ⇒ Object (readonly)
Returns the value of attribute process.
23 24 25 |
# File 'lib/taskinator/task.rb', line 23 def process @process end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
26 27 28 |
# File 'lib/taskinator/task.rb', line 26 def queue @queue end |
#updated_at ⇒ Object (readonly)
Returns the value of attribute updated_at.
28 29 30 |
# File 'lib/taskinator/task.rb', line 28 def updated_at @updated_at end |
#uuid ⇒ Object (readonly)
Returns the value of attribute uuid.
24 25 26 |
# File 'lib/taskinator/task.rb', line 24 def uuid @uuid end |
Class Method Details
.define_job_task(process, job, args, options = {}) ⇒ Object
14 15 16 |
# File 'lib/taskinator/task.rb', line 14 def define_job_task(process, job, args, ={}) Job.new(process, job, args, ) end |
.define_step_task(process, method, args, options = {}) ⇒ Object
10 11 12 |
# File 'lib/taskinator/task.rb', line 10 def define_step_task(process, method, args, ={}) Step.new(process, method, args, ) end |
.define_sub_process_task(process, sub_process, options = {}) ⇒ Object
18 19 20 |
# File 'lib/taskinator/task.rb', line 18 def define_sub_process_task(process, sub_process, ={}) SubProcess.new(process, sub_process, ) end |
Instance Method Details
#<=>(other) ⇒ Object
55 56 57 |
# File 'lib/taskinator/task.rb', line 55 def <=>(other) uuid <=> other.uuid end |
#accept(visitor) ⇒ Object
45 46 47 48 49 50 51 52 53 |
# File 'lib/taskinator/task.rb', line 45 def accept(visitor) visitor.visit_attribute(:uuid) visitor.visit_process_reference(:process) visitor.visit_task_reference(:next) visitor.visit_args(:options) visitor.visit_attribute(:queue) visitor.visit_attribute_time(:created_at) visitor.visit_attribute_time(:updated_at) end |
#cancel! ⇒ Object
105 106 107 108 109 110 111 112 |
# File 'lib/taskinator/task.rb', line 105 def cancel! transition(:cancelled) do self.incr_cancelled if incr_count? instrument('taskinator.task.cancelled', cancelled_payload) do cancel if respond_to?(:cancel) end end end |
#cancelled? ⇒ Boolean
114 115 116 |
# File 'lib/taskinator/task.rb', line 114 def cancelled? super || process.cancelled? end |
#complete! ⇒ Object
94 95 96 97 98 99 100 101 102 103 |
# File 'lib/taskinator/task.rb', line 94 def complete! transition(:completed) do self.incr_completed if incr_count? instrument('taskinator.task.completed', completed_payload) do complete if respond_to?(:complete) # notify the process that this task has completed process.task_completed(self) end end end |
#enqueue ⇒ Object
subclasses must implement the following methods
137 138 139 |
# File 'lib/taskinator/task.rb', line 137 def enqueue raise NotImplementedError end |
#enqueue! ⇒ Object
63 64 65 66 67 68 69 70 71 |
# File 'lib/taskinator/task.rb', line 63 def enqueue! return if paused? || cancelled? transition(:enqueued) do instrument('taskinator.task.enqueued', enqueued_payload) do enqueue end end end |
#fail!(error) ⇒ Object
118 119 120 121 122 123 124 125 126 127 |
# File 'lib/taskinator/task.rb', line 118 def fail!(error) transition(:failed) do self.incr_failed if incr_count? instrument('taskinator.task.failed', failed_payload(error)) do fail(error) if respond_to?(:fail) # notify the process that this task has failed process.task_failed(self, error) end end end |
#incr_count? ⇒ Boolean
129 130 131 |
# File 'lib/taskinator/task.rb', line 129 def incr_count? true end |
#paused? ⇒ Boolean
helper method
90 91 92 |
# File 'lib/taskinator/task.rb', line 90 def paused? super || process.paused? end |
#start ⇒ Object
141 142 143 |
# File 'lib/taskinator/task.rb', line 141 def start raise NotImplementedError end |
#start! ⇒ Object
73 74 75 76 77 78 79 80 81 82 |
# File 'lib/taskinator/task.rb', line 73 def start! return if paused? || cancelled? self.incr_processing if incr_count? transition(:processing) do instrument('taskinator.task.processing', processing_payload) do start end end end |
#to_s ⇒ Object
59 60 61 |
# File 'lib/taskinator/task.rb', line 59 def to_s "#<#{self.class.name}:#{uuid}>" end |