Class: Taskflow::Worker

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::Worker
Defined in:
lib/taskflow/worker.rb

Instance Method Summary collapse

Instance Method Details

#perform(task_flow_id, job_id, opts = {}) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/taskflow/worker.rb', line 5

def perform(task_flow_id,job_id,opts={})
    flow = Taskflow::Flow.find task_flow_id
    task = Taskflow::Task.find job_id
    begin
        reason = catch :control do
            check_flow_state flow
            check_task_state task
            task.update_attributes! state: 'running',started_at: Time.now,ended_at: nil, progress: 0.5,output: {},error: nil,result: nil
            task.go logger
        end
        case reason
        when :flow_halt
            flow.update_attributes! ended_at: Time.now unless flow.ended_at
        when :suspend
            task.update_attributes! result: 'suspend',state: 'paused'
        when :skip
            task.update_attributes! state: 'skipped',data: {}
        when :already_running,:already_stopped
            return
        else
            task.update_attributes! data: {},ended_at: Time.now,progress: 1,state: 'stopped',result: 'success'
        end
    rescue=>exception
        task.error = {
            class: exception.class.to_s,
            message: exception.to_s,
            backtrace: exception.backtrace
        }
        task.state = 'paused'
        task.result = 'error'
        task.ended_at = Time.now
        task.save
    end
    update_flow flow.reload
    flow.schedule
end