Class: Taskinator::Task
- Inherits:
-
Object
- Object
- Taskinator::Task
- Includes:
- Comparable, Instrumentation, Persistence, Workflow
- Defined in:
- lib/taskinator/task.rb
Direct Known Subclasses
Defined Under Namespace
Classes: Job, Step, SubProcess
Instance Attribute Summary collapse
-
#created_at ⇒ Object
readonly
Returns the value of attribute created_at.
-
#definition ⇒ Object
readonly
Returns the value of attribute definition.
-
#next ⇒ Object
the next task in the sequence.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#process ⇒ Object
readonly
Returns the value of attribute process.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#updated_at ⇒ Object
readonly
Returns the value of attribute updated_at.
-
#uuid ⇒ Object
readonly
Returns the value of attribute uuid.
Class Method Summary collapse
- .define_job_task(process, job, args, options = {}) ⇒ Object
- .define_step_task(process, method, args, options = {}) ⇒ Object
- .define_sub_process_task(process, sub_process, options = {}) ⇒ Object
Instance Method Summary collapse
- #<=>(other) ⇒ Object
- #accept(visitor) ⇒ Object
- #cancel! ⇒ Object
- #cancelled? ⇒ Boolean
- #complete! ⇒ Object
-
#enqueue ⇒ Object
:nocov:.
- #enqueue! ⇒ Object
- #fail!(error) ⇒ Object
- #incr_count? ⇒ Boolean
-
#initialize(process, options = {}) ⇒ Task
constructor
A new instance of Task.
-
#paused? ⇒ Boolean
helper method.
- #start ⇒ Object
- #start! ⇒ Object
- #to_s ⇒ Object
Methods included from Instrumentation
#cancelled_payload, #completed_payload, #enqueued_payload, #failed_payload, #instrument, #paused_payload, #processing_payload, #resumed_payload
Methods included from Persistence
add_process_to_list, deserialize, included, processes_list_key, serialize
Methods included from Workflow
#current_state, #current_state=, #transition
Constructor Details
#initialize(process, options = {}) ⇒ Task
Returns a new instance of Task.
34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/taskinator/task.rb', line 34 def initialize(process, ={}) raise ArgumentError, 'process' if process.nil? || !process.is_a?(Process) @uuid = "#{process.uuid}:task:#{Taskinator.generate_uuid}" @process = process @definition = process.definition @options = @queue = .delete(:queue) @created_at = Time.now.utc @updated_at = created_at @current_state = :initial end |
Instance Attribute Details
#created_at ⇒ Object (readonly)
Returns the value of attribute created_at.
28 29 30 |
# File 'lib/taskinator/task.rb', line 28 def created_at @created_at end |
#definition ⇒ Object (readonly)
Returns the value of attribute definition.
24 25 26 |
# File 'lib/taskinator/task.rb', line 24 def definition @definition end |
#next ⇒ Object
the next task in the sequence
32 33 34 |
# File 'lib/taskinator/task.rb', line 32 def next @next end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
26 27 28 |
# File 'lib/taskinator/task.rb', line 26 def @options end |
#process ⇒ Object (readonly)
Returns the value of attribute process.
23 24 25 |
# File 'lib/taskinator/task.rb', line 23 def process @process end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
27 28 29 |
# File 'lib/taskinator/task.rb', line 27 def queue @queue end |
#updated_at ⇒ Object (readonly)
Returns the value of attribute updated_at.
29 30 31 |
# File 'lib/taskinator/task.rb', line 29 def updated_at @updated_at end |
#uuid ⇒ Object (readonly)
Returns the value of attribute uuid.
25 26 27 |
# File 'lib/taskinator/task.rb', line 25 def uuid @uuid end |
Class Method Details
.define_job_task(process, job, args, options = {}) ⇒ Object
14 15 16 |
# File 'lib/taskinator/task.rb', line 14 def define_job_task(process, job, args, ={}) Job.new(process, job, args, ) end |
.define_step_task(process, method, args, options = {}) ⇒ Object
10 11 12 |
# File 'lib/taskinator/task.rb', line 10 def define_step_task(process, method, args, ={}) Step.new(process, method, args, ) end |
.define_sub_process_task(process, sub_process, options = {}) ⇒ Object
18 19 20 |
# File 'lib/taskinator/task.rb', line 18 def define_sub_process_task(process, sub_process, ={}) SubProcess.new(process, sub_process, ) end |
Instance Method Details
#<=>(other) ⇒ Object
58 59 60 |
# File 'lib/taskinator/task.rb', line 58 def <=>(other) uuid <=> other.uuid end |
#accept(visitor) ⇒ Object
47 48 49 50 51 52 53 54 55 56 |
# File 'lib/taskinator/task.rb', line 47 def accept(visitor) visitor.visit_attribute(:uuid) visitor.visit_process_reference(:process) visitor.visit_type(:definition) visitor.visit_task_reference(:next) visitor.visit_args(:options) visitor.visit_attribute(:queue) visitor.visit_attribute_time(:created_at) visitor.visit_attribute_time(:updated_at) end |
#cancel! ⇒ Object
108 109 110 111 112 113 114 115 |
# File 'lib/taskinator/task.rb', line 108 def cancel! transition(:cancelled) do self.incr_cancelled if incr_count? instrument('taskinator.task.cancelled', cancelled_payload) do cancel if respond_to?(:cancel) end end end |
#cancelled? ⇒ Boolean
117 118 119 |
# File 'lib/taskinator/task.rb', line 117 def cancelled? super || process.cancelled? end |
#complete! ⇒ Object
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/taskinator/task.rb', line 97 def complete! transition(:completed) do self.incr_completed if incr_count? instrument('taskinator.task.completed', completed_payload) do complete if respond_to?(:complete) # notify the process that this task has completed process.task_completed(self) end end end |
#enqueue ⇒ Object
:nocov:
141 142 143 |
# File 'lib/taskinator/task.rb', line 141 def enqueue raise NotImplementedError end |
#enqueue! ⇒ Object
66 67 68 69 70 71 72 73 74 |
# File 'lib/taskinator/task.rb', line 66 def enqueue! return if paused? || cancelled? transition(:enqueued) do instrument('taskinator.task.enqueued', enqueued_payload) do enqueue end end end |
#fail!(error) ⇒ Object
121 122 123 124 125 126 127 128 129 130 |
# File 'lib/taskinator/task.rb', line 121 def fail!(error) transition(:failed) do self.incr_failed if incr_count? instrument('taskinator.task.failed', failed_payload(error)) do fail(error) if respond_to?(:fail) # notify the process that this task has failed process.task_failed(self, error) end end end |
#incr_count? ⇒ Boolean
132 133 134 |
# File 'lib/taskinator/task.rb', line 132 def incr_count? true end |
#paused? ⇒ Boolean
helper method
93 94 95 |
# File 'lib/taskinator/task.rb', line 93 def paused? super || process.paused? end |
#start ⇒ Object
145 146 147 |
# File 'lib/taskinator/task.rb', line 145 def start raise NotImplementedError end |
#start! ⇒ Object
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/taskinator/task.rb', line 76 def start! return if paused? || cancelled? self.incr_processing if incr_count? transition(:processing) do instrument('taskinator.task.processing', processing_payload) do start end end end |
#to_s ⇒ Object
62 63 64 |
# File 'lib/taskinator/task.rb', line 62 def to_s "#<#{self.class.name}:#{uuid}>" end |