Class: Gush::Workflow
- Inherits:
-
Object
- Object
- Gush::Workflow
- Defined in:
- lib/gush/workflow.rb
Instance Attribute Summary collapse
-
#arguments ⇒ Object
Returns the value of attribute arguments.
-
#dependencies ⇒ Object
Returns the value of attribute dependencies.
-
#globals ⇒ Object
Returns the value of attribute globals.
- #id ⇒ Object
-
#jobs ⇒ Object
Returns the value of attribute jobs.
-
#kwargs ⇒ Object
Returns the value of attribute kwargs.
-
#persisted ⇒ Object
Returns the value of attribute persisted.
-
#stopped ⇒ Object
Returns the value of attribute stopped.
Class Method Summary collapse
- .create(*args, **kwargs) ⇒ Object
- .descendants ⇒ Object
- .find(id) ⇒ Object
- .page(start = 0, stop = 99, order: :asc) ⇒ Object
Instance Method Summary collapse
- #configure(*args, **kwargs) ⇒ Object
- #continue ⇒ Object
- #expire!(ttl = nil) ⇒ Object
- #failed? ⇒ Boolean
- #find_job(name) ⇒ Object
- #finished? ⇒ Boolean
- #finished_at ⇒ Object
- #initial_jobs ⇒ Object
-
#initialize(*args, globals: nil, internal_state: {}, **kwargs) ⇒ Workflow
constructor
A new instance of Workflow.
- #mark_as_persisted ⇒ Object
- #mark_as_started ⇒ Object
- #mark_as_stopped ⇒ Object
- #persist! ⇒ Object
- #reload ⇒ Object
- #resolve_dependencies ⇒ Object
- #run(klass, opts = {}) ⇒ Object
- #running? ⇒ Boolean
- #save ⇒ Object
- #start! ⇒ Object
- #started? ⇒ Boolean
- #started_at ⇒ Object
- #status ⇒ Object
- #stopped? ⇒ Boolean
- #to_hash ⇒ Object
- #to_json(options = {}) ⇒ Object
Constructor Details
#initialize(*args, globals: nil, internal_state: {}, **kwargs) ⇒ Workflow
Returns a new instance of Workflow.
8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/gush/workflow.rb', line 8 def initialize(*args, globals: nil, internal_state: {}, **kwargs) @arguments = args @kwargs = kwargs @globals = globals || {} @id = internal_state[:id] || id @jobs = internal_state[:jobs] || [] @dependencies = internal_state[:dependencies] || [] @persisted = internal_state[:persisted] || false @stopped = internal_state[:stopped] || false setup unless internal_state[:skip_setup] end |
Instance Attribute Details
#arguments ⇒ Object
Returns the value of attribute arguments.
5 6 7 |
# File 'lib/gush/workflow.rb', line 5 def arguments @arguments end |
#dependencies ⇒ Object
Returns the value of attribute dependencies.
5 6 7 |
# File 'lib/gush/workflow.rb', line 5 def dependencies @dependencies end |
#globals ⇒ Object
Returns the value of attribute globals.
5 6 7 |
# File 'lib/gush/workflow.rb', line 5 def globals @globals end |
#id ⇒ Object
208 209 210 |
# File 'lib/gush/workflow.rb', line 208 def id @id ||= client.next_free_workflow_id end |
#jobs ⇒ Object
Returns the value of attribute jobs.
5 6 7 |
# File 'lib/gush/workflow.rb', line 5 def jobs @jobs end |
#kwargs ⇒ Object
Returns the value of attribute kwargs.
5 6 7 |
# File 'lib/gush/workflow.rb', line 5 def kwargs @kwargs end |
#persisted ⇒ Object
Returns the value of attribute persisted.
5 6 7 |
# File 'lib/gush/workflow.rb', line 5 def persisted @persisted end |
#stopped ⇒ Object
Returns the value of attribute stopped.
5 6 7 |
# File 'lib/gush/workflow.rb', line 5 def stopped @stopped end |
Class Method Details
.create(*args, **kwargs) ⇒ Object
30 31 32 33 34 |
# File 'lib/gush/workflow.rb', line 30 def self.create(*args, **kwargs) flow = new(*args, **kwargs) flow.save flow end |
.descendants ⇒ Object
204 205 206 |
# File 'lib/gush/workflow.rb', line 204 def self.descendants ObjectSpace.each_object(Class).select { |klass| klass < self } end |
.find(id) ⇒ Object
22 23 24 |
# File 'lib/gush/workflow.rb', line 22 def self.find(id) Gush::Client.new.find_workflow(id) end |
.page(start = 0, stop = 99, order: :asc) ⇒ Object
26 27 28 |
# File 'lib/gush/workflow.rb', line 26 def self.page(start=0, stop=99, order: :asc) Gush::Client.new.workflows(start, stop, order: order) end |
Instance Method Details
#configure(*args, **kwargs) ⇒ Object
49 50 |
# File 'lib/gush/workflow.rb', line 49 def configure(*args, **kwargs) end |
#continue ⇒ Object
36 37 38 39 40 41 42 43 |
# File 'lib/gush/workflow.rb', line 36 def continue client = Gush::Client.new failed_jobs = jobs.select(&:failed?) failed_jobs.each do |job| client.enqueue_job(id, job) end end |
#expire!(ttl = nil) ⇒ Object
64 65 66 |
# File 'lib/gush/workflow.rb', line 64 def expire!(ttl=nil) client.expire_workflow(self, ttl) end |
#failed? ⇒ Boolean
110 111 112 |
# File 'lib/gush/workflow.rb', line 110 def failed? jobs.any?(&:failed?) end |
#find_job(name) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/gush/workflow.rb', line 86 def find_job(name) match_data = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(name.to_s) if match_data.nil? job = jobs.find { |node| node.klass.to_s == name.to_s } else job = jobs.find { |node| node.name.to_s == name.to_s } end job end |
#finished? ⇒ Boolean
98 99 100 |
# File 'lib/gush/workflow.rb', line 98 def finished? jobs.all?(&:finished?) end |
#finished_at ⇒ Object
176 177 178 |
# File 'lib/gush/workflow.rb', line 176 def finished_at last_job ? last_job.finished_at : nil end |
#initial_jobs ⇒ Object
153 154 155 |
# File 'lib/gush/workflow.rb', line 153 def initial_jobs jobs.select(&:has_no_dependencies?) end |
#mark_as_persisted ⇒ Object
68 69 70 |
# File 'lib/gush/workflow.rb', line 68 def mark_as_persisted @persisted = true end |
#mark_as_started ⇒ Object
72 73 74 |
# File 'lib/gush/workflow.rb', line 72 def mark_as_started @stopped = false end |
#mark_as_stopped ⇒ Object
52 53 54 |
# File 'lib/gush/workflow.rb', line 52 def mark_as_stopped @stopped = true end |
#persist! ⇒ Object
60 61 62 |
# File 'lib/gush/workflow.rb', line 60 def persist! client.persist_workflow(self) end |
#reload ⇒ Object
144 145 146 147 148 149 150 151 |
# File 'lib/gush/workflow.rb', line 144 def reload flow = self.class.find(id) self.jobs = flow.jobs self.stopped = flow.stopped self end |
#resolve_dependencies ⇒ Object
76 77 78 79 80 81 82 83 84 |
# File 'lib/gush/workflow.rb', line 76 def resolve_dependencies @dependencies.each do |dependency| from = find_job(dependency[:from]) to = find_job(dependency[:to]) to.incoming << dependency[:from] from.outgoing << dependency[:to] end end |
#run(klass, opts = {}) ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/gush/workflow.rb', line 118 def run(klass, opts = {}) node = klass.new({ workflow_id: id, id: client.next_free_job_id(id, klass.to_s), params: (@globals || {}).merge(opts.fetch(:params, {})), queue: opts[:queue], wait: opts[:wait] }) jobs << node deps_after = [*opts[:after]] deps_after.each do |dep| @dependencies << {from: dep.to_s, to: node.name.to_s } end deps_before = [*opts[:before]] deps_before.each do |dep| @dependencies << {from: node.name.to_s, to: dep.to_s } end node.name end |
#running? ⇒ Boolean
106 107 108 |
# File 'lib/gush/workflow.rb', line 106 def running? started? && !finished? end |
#save ⇒ Object
45 46 47 |
# File 'lib/gush/workflow.rb', line 45 def save persist! end |
#start! ⇒ Object
56 57 58 |
# File 'lib/gush/workflow.rb', line 56 def start! client.start_workflow(self) end |
#started? ⇒ Boolean
102 103 104 |
# File 'lib/gush/workflow.rb', line 102 def started? !!started_at end |
#started_at ⇒ Object
172 173 174 |
# File 'lib/gush/workflow.rb', line 172 def started_at first_job ? first_job.started_at : nil end |
#status ⇒ Object
157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/gush/workflow.rb', line 157 def status case when failed? :failed when running? :running when finished? :finished when stopped? :stopped else :pending end end |
#stopped? ⇒ Boolean
114 115 116 |
# File 'lib/gush/workflow.rb', line 114 def stopped? stopped end |
#to_hash ⇒ Object
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/gush/workflow.rb', line 180 def to_hash name = self.class.to_s { name: name, id: id, arguments: @arguments, kwargs: @kwargs, globals: @globals, dependencies: @dependencies, total: jobs.count, finished: jobs.count(&:finished?), klass: name, job_klasses: jobs.map(&:class).map(&:to_s).uniq, status: status, stopped: stopped, started_at: started_at, finished_at: finished_at } end |
#to_json(options = {}) ⇒ Object
200 201 202 |
# File 'lib/gush/workflow.rb', line 200 def to_json( = {}) Gush::JSON.encode(to_hash) end |