Class: Airflow::Worker
- Inherits:
-
Object
- Object
- Airflow::Worker
- Includes:
- Utils::Logger
- Defined in:
- lib/async_flow/worker.rb
Instance Method Summary collapse
-
#initialize(workflows: Airflow::Workflow.definitions, tasks: Airflow::Task.definitions) ⇒ Worker
constructor
TODO: Add path to load workflows and task definitions.
- #start(wait_interrupt: true) ⇒ Object
- #stop ⇒ Object
Methods included from Utils::Logger
Constructor Details
#initialize(workflows: Airflow::Workflow.definitions, tasks: Airflow::Task.definitions) ⇒ Worker
TODO: Add path to load workflows and task definitions. TODO: Assert definitions to be present
16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/async_flow/worker.rb', line 16 def initialize(workflows: Airflow::Workflow.definitions, tasks: Airflow::Task.definitions) @workflows = workflows @tasks = tasks @reactor = Reactor.instance @sig_handler = SigHandler.new(self) @mutex = Mutex.new @state = State.bg_state @workflow_runner = Runner::WorkflowRunner.new(workflows) @task_runner = Runner::TaskRunner.new(tasks) Process.setproctitle("ruby-flow-worker") end |
Instance Method Details
#start(wait_interrupt: true) ⇒ Object
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/async_flow/worker.rb', line 29 def start(wait_interrupt: true) running! reactor.start reactor.schedule do |parent| parent.schedule { |subtask| workflow_runner.run(subtask) } parent.schedule { |subtask| task_runner.run(subtask) } end sig_handler.call { logger.info "Flow Worker started. Use Ctrl-C to stop" } if wait_interrupt end |
#stop ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/async_flow/worker.rb', line 40 def stop logger.info "stopping worker" mutex.synchronize { state.stop! } task_runner.stop workflow_runner.stop # reactor.stop # raise(Error.new("Stopping")) # TODO # launcher.stop # task_execution&.drain # workflow_execution&.drain logger.info "worker stopped" end |