Class: Taskinator::Task
Defined Under Namespace
Classes: Job, Step, SubProcess
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
#cancelled_payload, #completed_payload, #enqueued_payload, #failed_payload, #instrument, #paused_payload, #processing_payload, #resumed_payload
add_process_to_list, deserialize, included, list_key, serialize
Methods included from Workflow
#current_state, #current_state=, #transition
Constructor Details
#initialize(process, options = {}) ⇒ Task
Returns a new instance of Task.
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/taskinator/task.rb', line 37
def initialize(process, options={})
raise ArgumentError, 'process' if process.nil? || !process.is_a?(Process)
@uuid = SecureRandom.uuid
@process = process
@options = options
@queue = options.delete(:queue)
@created_at = Time.now.utc
@updated_at = created_at
end
|
Instance Attribute Details
#created_at ⇒ Object
Returns the value of attribute created_at.
31
32
33
|
# File 'lib/taskinator/task.rb', line 31
def created_at
@created_at
end
|
#next ⇒ Object
the next task in the sequence
35
36
37
|
# File 'lib/taskinator/task.rb', line 35
def next
@next
end
|
#options ⇒ Object
Returns the value of attribute options.
29
30
31
|
# File 'lib/taskinator/task.rb', line 29
def options
@options
end
|
#process ⇒ Object
Returns the value of attribute process.
27
28
29
|
# File 'lib/taskinator/task.rb', line 27
def process
@process
end
|
#queue ⇒ Object
Returns the value of attribute queue.
30
31
32
|
# File 'lib/taskinator/task.rb', line 30
def queue
@queue
end
|
#updated_at ⇒ Object
Returns the value of attribute updated_at.
32
33
34
|
# File 'lib/taskinator/task.rb', line 32
def updated_at
@updated_at
end
|
#uuid ⇒ Object
Returns the value of attribute uuid.
28
29
30
|
# File 'lib/taskinator/task.rb', line 28
def uuid
@uuid
end
|
Class Method Details
.base_key ⇒ Object
22
23
24
|
# File 'lib/taskinator/task.rb', line 22
def base_key
'task'
end
|
.define_job_task(process, job, args, options = {}) ⇒ Object
14
15
16
|
# File 'lib/taskinator/task.rb', line 14
def define_job_task(process, job, args, options={})
Job.new(process, job, args, options)
end
|
.define_step_task(process, method, args, options = {}) ⇒ Object
10
11
12
|
# File 'lib/taskinator/task.rb', line 10
def define_step_task(process, method, args, options={})
Step.new(process, method, args, options)
end
|
.define_sub_process_task(process, sub_process, options = {}) ⇒ Object
18
19
20
|
# File 'lib/taskinator/task.rb', line 18
def define_sub_process_task(process, sub_process, options={})
SubProcess.new(process, sub_process, options)
end
|
Instance Method Details
#<=>(other) ⇒ Object
58
59
60
|
# File 'lib/taskinator/task.rb', line 58
def <=>(other)
uuid <=> other.uuid
end
|
#accept(visitor) ⇒ Object
48
49
50
51
52
53
54
55
56
|
# File 'lib/taskinator/task.rb', line 48
def accept(visitor)
visitor.visit_attribute(:uuid)
visitor.visit_process_reference(:process)
visitor.visit_task_reference(:next)
visitor.visit_args(:options)
visitor.visit_attribute(:queue)
visitor.visit_attribute_time(:created_at)
visitor.visit_attribute_time(:updated_at)
end
|
#cancel! ⇒ Object
108
109
110
111
112
113
114
115
|
# File 'lib/taskinator/task.rb', line 108
def cancel!
transition(:cancelled) do
self.incr_cancelled if incr_count?
instrument('taskinator.task.cancelled', cancelled_payload) do
cancel if respond_to?(:cancel)
end
end
end
|
#cancelled? ⇒ Boolean
117
118
119
|
# File 'lib/taskinator/task.rb', line 117
def cancelled?
super || process.cancelled?
end
|
#complete! ⇒ Object
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/taskinator/task.rb', line 97
def complete!
transition(:completed) do
self.incr_completed if incr_count?
instrument('taskinator.task.completed', completed_payload) do
complete if respond_to?(:complete)
process.task_completed(self)
end
end
end
|
#enqueue ⇒ Object
subclasses must implement the following methods
140
141
142
|
# File 'lib/taskinator/task.rb', line 140
def enqueue
raise NotImplementedError
end
|
#enqueue! ⇒ Object
66
67
68
69
70
71
72
73
74
|
# File 'lib/taskinator/task.rb', line 66
def enqueue!
return if paused? || cancelled?
transition(:enqueued) do
instrument('taskinator.task.enqueued', enqueued_payload) do
enqueue
end
end
end
|
#fail!(error) ⇒ Object
121
122
123
124
125
126
127
128
129
130
|
# File 'lib/taskinator/task.rb', line 121
def fail!(error)
transition(:failed) do
self.incr_failed if incr_count?
instrument('taskinator.task.failed', failed_payload(error)) do
fail(error) if respond_to?(:fail)
process.task_failed(self, error)
end
end
end
|
#incr_count? ⇒ Boolean
132
133
134
|
# File 'lib/taskinator/task.rb', line 132
def incr_count?
true
end
|
#paused? ⇒ Boolean
93
94
95
|
# File 'lib/taskinator/task.rb', line 93
def paused?
super || process.paused?
end
|
#start ⇒ Object
144
145
146
|
# File 'lib/taskinator/task.rb', line 144
def start
raise NotImplementedError
end
|
#start! ⇒ Object
76
77
78
79
80
81
82
83
84
85
|
# File 'lib/taskinator/task.rb', line 76
def start!
return if paused? || cancelled?
self.incr_processing if incr_count?
transition(:processing) do
instrument('taskinator.task.processing', processing_payload) do
start
end
end
end
|
#to_s ⇒ Object
62
63
64
|
# File 'lib/taskinator/task.rb', line 62
def to_s
"#<#{self.class.name}:#{uuid}>"
end
|