Class: Taskflow::Flow

Inherits:
Object
  • Object
show all
Includes:
Mongoid::Document
Defined in:
lib/taskflow/flow.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.can_launch?(klass, opts = {}) ⇒ Boolean

opts support :params,

Returns:

  • (Boolean)


26
27
28
29
# File 'lib/taskflow/flow.rb', line 26

def can_launch?(klass,opts={})
    opts = HashWithIndifferentAccess.new opts
    !Taskflow::Flow.ne(state: 'stopped').where(klass: klass,input: opts[:params]).exists?
end

.launch(klass, opts = {}) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/taskflow/flow.rb', line 31

def launch(klass,opts={})
    opts = HashWithIndifferentAccess.new opts
    flow_klass = Kernel.const_get klass
    name = flow_klass.const_get 'NAME'
    opts[:launched_by] ||= 'task-flow-engine'
    flow = flow_klass.create name: name,input: opts[:params],launched_by: opts[:launched_by]
    if opts[:next_workflow_config]
        flow.update next_config: opts[:next_workflow_config]
    end
    flow.create_logger name: name,description: opts[:workflow_description]
    flow.schedule
end

Instance Method Details

#resumeObject



54
55
56
57
58
# File 'lib/taskflow/flow.rb', line 54

def resume
    self.tasks.where(state: 'paused',result: 'error').each do |task|
        task.resume
    end
end

#running_stepsObject



45
46
47
# File 'lib/taskflow/flow.rb', line 45

def running_steps
    self.tasks.in(state: ['running','paused'])
end

#scheduleObject



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/taskflow/flow.rb', line 60

def schedule
    return if self.halt_by || self.state == 'stopped'
    self.update_attributes! state: 'running',started_at: Time.now if self.state == 'pending'
    task_list = []
    self.reload.tasks.where(state: 'pending').each do |task|
        # 上游全部完成
        if task.upstream.empty? || task.upstream.all?{|t| %w(skipped stopped).include? t.state }
            task_list << task.id.to_s
        end
    end
    task_list.each{|tid| Taskflow::Worker.perform_async self.id.to_s,tid }
    self
end

#stop!(user_id = nil) ⇒ Object



49
50
51
52
# File 'lib/taskflow/flow.rb', line 49

def stop!(user_id=nil)
    percent = self.tasks.map(&:progress).sum / self.tasks.size
    self.update_attributes! progress: percent,halt_by: user_id,ended_at: Time.now, state: 'stopped',result: 'warning'
end