Class: Taskflow::Flow
- Inherits:
-
Object
- Object
- Taskflow::Flow
- Includes:
- Mongoid::Document
- Defined in:
- lib/taskflow/flow.rb
Class Method Summary collapse
-
.can_launch?(klass, opts = {}) ⇒ Boolean
opts support :params,.
- .launch(klass, opts = {}) ⇒ Object
Instance Method Summary collapse
Class Method Details
.can_launch?(klass, opts = {}) ⇒ Boolean
opts support :params,
26 27 28 29 |
# File 'lib/taskflow/flow.rb', line 26 def can_launch?(klass,opts={}) opts = HashWithIndifferentAccess.new opts !Taskflow::Flow.ne(state: 'stopped').where(klass: klass,input: opts[:params]).exists? end |
.launch(klass, opts = {}) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/taskflow/flow.rb', line 31 def launch(klass,opts={}) opts = HashWithIndifferentAccess.new opts flow_klass = Kernel.const_get klass name = flow_klass.const_get 'NAME' opts[:launched_by] ||= 'task-flow-engine' flow = flow_klass.create name: name,input: opts[:params],launched_by: opts[:launched_by] if opts[:next_workflow_config] flow.update next_config: opts[:next_workflow_config] end flow.create_logger name: name,description: opts[:workflow_description] flow.schedule end |
Instance Method Details
#resume ⇒ Object
54 55 56 57 58 |
# File 'lib/taskflow/flow.rb', line 54 def resume self.tasks.where(state: 'paused',result: 'error').each do |task| task.resume end end |
#running_steps ⇒ Object
45 46 47 |
# File 'lib/taskflow/flow.rb', line 45 def running_steps self.tasks.in(state: ['running','paused']) end |
#schedule ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/taskflow/flow.rb', line 60 def schedule return if self.halt_by || self.state == 'stopped' self.update_attributes! state: 'running',started_at: Time.now if self.state == 'pending' task_list = [] self.reload.tasks.where(state: 'pending').each do |task| # 上游全部完成 if task.upstream.empty? || task.upstream.all?{|t| %w(skipped stopped).include? t.state } task_list << task.id.to_s end end task_list.each{|tid| Taskflow::Worker.perform_async self.id.to_s,tid } self end |
#stop!(user_id = nil) ⇒ Object
49 50 51 52 |
# File 'lib/taskflow/flow.rb', line 49 def stop!(user_id=nil) percent = self.tasks.map(&:progress).sum / self.tasks.size self.update_attributes! progress: percent,halt_by: user_id,ended_at: Time.now, state: 'stopped',result: 'warning' end |