Class: Taskinator::Process
Defined Under Namespace
Classes: Concurrent, Sequential
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
#cancelled_payload, #completed_payload, #enqueued_payload, #failed_payload, #instrument, #paused_payload, #processing_payload, #resumed_payload
add_process_to_list, deserialize, included, processes_list_key, serialize
Methods included from Workflow
#current_state, #current_state=, #transition
Constructor Details
#initialize(definition, options = {}) ⇒ Process
Returns a new instance of Process.
33
34
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/taskinator/process.rb', line 33
def initialize(definition, options={})
raise ArgumentError, 'definition' if definition.nil?
raise ArgumentError, "#{definition.name} does not extend the #{Definition.name} module" unless definition.kind_of?(Definition)
@uuid = options.delete(:uuid) || Taskinator.generate_uuid
@definition = definition
@options = options
@scope = options.delete(:scope)
@queue = options.delete(:queue)
@created_at = Time.now.utc
@updated_at = created_at
@current_state = :initial
end
|
Instance Attribute Details
#created_at ⇒ Object
Returns the value of attribute created_at.
27
28
29
|
# File 'lib/taskinator/process.rb', line 27
def created_at
@created_at
end
|
#definition ⇒ Object
Returns the value of attribute definition.
23
24
25
|
# File 'lib/taskinator/process.rb', line 23
def definition
@definition
end
|
#options ⇒ Object
Returns the value of attribute options.
24
25
26
|
# File 'lib/taskinator/process.rb', line 24
def options
@options
end
|
#parent ⇒ Object
in the case of sub process tasks, the containing task
31
32
33
|
# File 'lib/taskinator/process.rb', line 31
def parent
@parent
end
|
#queue ⇒ Object
Returns the value of attribute queue.
26
27
28
|
# File 'lib/taskinator/process.rb', line 26
def queue
@queue
end
|
#scope ⇒ Object
Returns the value of attribute scope.
25
26
27
|
# File 'lib/taskinator/process.rb', line 25
def scope
@scope
end
|
#updated_at ⇒ Object
Returns the value of attribute updated_at.
28
29
30
|
# File 'lib/taskinator/process.rb', line 28
def updated_at
@updated_at
end
|
#uuid ⇒ Object
Returns the value of attribute uuid.
22
23
24
|
# File 'lib/taskinator/process.rb', line 22
def uuid
@uuid
end
|
Class Method Details
.define_concurrent_process_for(definition, complete_on = CompleteOn::Default, options = {}) ⇒ Object
17
18
19
|
# File 'lib/taskinator/process.rb', line 17
def define_concurrent_process_for(definition, complete_on=CompleteOn::Default, options={})
Process::Concurrent.new(definition, complete_on, options)
end
|
.define_sequential_process_for(definition, options = {}) ⇒ Object
13
14
15
|
# File 'lib/taskinator/process.rb', line 13
def define_sequential_process_for(definition, options={})
Process::Sequential.new(definition, options)
end
|
Instance Method Details
#<=>(other) ⇒ Object
74
75
76
|
# File 'lib/taskinator/process.rb', line 74
def <=>(other)
uuid <=> other.uuid
end
|
#accept(visitor) ⇒ Object
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/taskinator/process.rb', line 62
def accept(visitor)
visitor.visit_attribute(:uuid)
visitor.visit_task_reference(:parent)
visitor.visit_type(:definition)
visitor.visit_tasks(tasks)
visitor.visit_args(:options)
visitor.visit_attribute(:scope)
visitor.visit_attribute(:queue)
visitor.visit_attribute_time(:created_at)
visitor.visit_attribute_time(:updated_at)
end
|
#cancel! ⇒ Object
145
146
147
148
149
150
151
|
# File 'lib/taskinator/process.rb', line 145
def cancel!
transition(:cancelled) do
instrument('taskinator.process.cancelled', cancelled_payload) do
cancel if respond_to?(:cancel)
end
end
end
|
#complete! ⇒ Object
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/taskinator/process.rb', line 122
def complete!
transition(:completed) do
instrument('taskinator.process.completed', completed_payload) do
complete if respond_to?(:complete)
unless parent.nil?
parent.complete!
else
cleanup
end
end
end
end
|
#enqueue ⇒ Object
174
175
176
|
# File 'lib/taskinator/process.rb', line 174
def enqueue
raise NotImplementedError
end
|
#enqueue! ⇒ Object
82
83
84
85
86
87
88
89
90
|
# File 'lib/taskinator/process.rb', line 82
def enqueue!
return if paused? || cancelled?
transition(:enqueued) do
instrument('taskinator.process.enqueued', enqueued_payload) do
enqueue
end
end
end
|
#fail!(error) ⇒ Object
153
154
155
156
157
158
159
160
161
162
|
# File 'lib/taskinator/process.rb', line 153
def fail!(error)
transition(:failed) do
instrument('taskinator.process.failed', failed_payload(error)) do
fail(error) if respond_to?(:fail)
parent.fail!(error) unless parent.nil?
end
end
end
|
#no_tasks_defined? ⇒ Boolean
58
59
60
|
# File 'lib/taskinator/process.rb', line 58
def no_tasks_defined?
tasks.empty?
end
|
#pause! ⇒ Object
102
103
104
105
106
107
108
109
110
|
# File 'lib/taskinator/process.rb', line 102
def pause!
return unless enqueued? || processing?
transition(:paused) do
instrument('taskinator.process.paused', paused_payload) do
pause if respond_to?(:pause)
end
end
end
|
#resume! ⇒ Object
112
113
114
115
116
117
118
119
120
|
# File 'lib/taskinator/process.rb', line 112
def resume!
return unless paused?
transition(:processing) do
instrument('taskinator.process.resumed', resumed_payload) do
resume if respond_to?(:resume)
end
end
end
|
#start ⇒ Object
178
179
180
|
# File 'lib/taskinator/process.rb', line 178
def start
raise NotImplementedError
end
|
#start! ⇒ Object
92
93
94
95
96
97
98
99
100
|
# File 'lib/taskinator/process.rb', line 92
def start!
return if paused? || cancelled?
transition(:processing) do
instrument('taskinator.process.processing', processing_payload) do
start
end
end
end
|
#task_completed(task) ⇒ Object
182
183
184
|
# File 'lib/taskinator/process.rb', line 182
def task_completed(task)
raise NotImplementedError
end
|
#task_failed(task, error) ⇒ Object
164
165
166
167
|
# File 'lib/taskinator/process.rb', line 164
def task_failed(task, error)
fail!(error)
end
|
#tasks ⇒ Object
54
55
56
|
# File 'lib/taskinator/process.rb', line 54
def tasks
@tasks ||= Tasks.new
end
|
#tasks_completed? ⇒ Boolean
TODO: add retry method - to pick up from a failed task
e.g. like retrying a failed job in Resque Web
140
141
142
143
|
# File 'lib/taskinator/process.rb', line 140
def tasks_completed?
tasks.all?(&:completed?)
end
|
#to_s ⇒ Object
78
79
80
|
# File 'lib/taskinator/process.rb', line 78
def to_s
"#<#{self.class.name}:#{uuid}>"
end
|