Class: Burstflow::Manager
- Inherits:
-
Object
- Object
- Burstflow::Manager
- Defined in:
- lib/burstflow/manager.rb
Instance Attribute Summary collapse
-
#workflow ⇒ Object
Returns the value of attribute workflow.
Instance Method Summary collapse
-
#enqueue_job!(job) ⇒ Object
Mark job enqueued and enqueue it.
-
#fail_job!(job, exception) ⇒ Object
Mark job failed and make further actions.
-
#finish_job!(job) ⇒ Object
Mark job finished and make further actions.
-
#initialize(workflow) ⇒ Manager
constructor
A new instance of Manager.
-
#job_performed!(job, result) ⇒ Object
Mark job finished or suspended depends on result or output.
-
#resume_job!(job, data) ⇒ Object
Enqueue job for resuming.
- #resume_workflow!(job_id, data) ⇒ Object
-
#start_workflow! ⇒ Object
workflow management.
-
#suspend_job!(job) ⇒ Object
Mark job suspended and forget it until resume.
Constructor Details
#initialize(workflow) ⇒ Manager
Returns a new instance of Manager.
7 8 9 |
# File 'lib/burstflow/manager.rb', line 7 def initialize(workflow) @workflow = workflow end |
Instance Attribute Details
#workflow ⇒ Object
Returns the value of attribute workflow.
5 6 7 |
# File 'lib/burstflow/manager.rb', line 5 def workflow @workflow end |
Instance Method Details
#enqueue_job!(job) ⇒ Object
Mark job enqueued and enqueue it
33 34 35 36 37 38 39 40 |
# File 'lib/burstflow/manager.rb', line 33 def enqueue_job!(job) job.run_callbacks :enqueue do job.enqueue! job.save! do Burstflow::Worker.perform_later(workflow.id, job.id) end end end |
#fail_job!(job, exception) ⇒ Object
Mark job failed and make further actions
68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/burstflow/manager.rb', line 68 def fail_job!(job, exception) job.run_callbacks :failure do job.fail! exception workflow.add_error(job) workflow.save! job.save! do analyze_workflow_state(job) end end end |
#finish_job!(job) ⇒ Object
Mark job finished and make further actions
60 61 62 63 64 65 |
# File 'lib/burstflow/manager.rb', line 60 def finish_job!(job) job.finish! job.save! do analyze_workflow_state(job) end end |
#job_performed!(job, result) ⇒ Object
Mark job finished or suspended depends on result or output
81 82 83 84 85 86 87 88 89 |
# File 'lib/burstflow/manager.rb', line 81 def job_performed!(job, result) if result == Burstflow::Job::SUSPEND || job.output == Burstflow::Job::SUSPEND suspend_job!(job) else finish_job!(job) end rescue StandardError => e raise Burstflow::Workflow::InternalError.new(workflow, e.) end |
#resume_job!(job, data) ⇒ Object
Enqueue job for resuming
43 44 45 46 47 |
# File 'lib/burstflow/manager.rb', line 43 def resume_job!(job, data) job.save! do Burstflow::Worker.perform_later(workflow.id, job.id, data) end end |
#resume_workflow!(job_id, data) ⇒ Object
23 24 25 26 27 28 29 30 |
# File 'lib/burstflow/manager.rb', line 23 def resume_workflow!(job_id, data) workflow.with_lock do workflow.resumed! job = workflow.job(job_id) resume_job!(job, data) end end |
#start_workflow! ⇒ Object
workflow management
13 14 15 16 17 18 19 20 21 |
# File 'lib/burstflow/manager.rb', line 13 def start_workflow! workflow.with_lock do workflow.runnig! workflow.initial_jobs.each do |job| enqueue_job!(job) end end end |
#suspend_job!(job) ⇒ Object
Mark job suspended and forget it until resume
50 51 52 53 54 55 56 57 |
# File 'lib/burstflow/manager.rb', line 50 def suspend_job!(job) job.run_callbacks :suspend do job.suspend! job.save! do analyze_workflow_state(job) end end end |