Class: Airflow::Worker

Inherits:
Object
  • Object
show all
Includes:
Utils::Logger
Defined in:
lib/async_flow/worker.rb

Instance Method Summary collapse

Methods included from Utils::Logger

#logger

Constructor Details

#initialize(workflows: Airflow::Workflow.definitions, tasks: Airflow::Task.definitions) ⇒ Worker

TODO: Add path to load workflows and task definitions. TODO: Assert definitions to be present



16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/async_flow/worker.rb', line 16

def initialize(workflows: Airflow::Workflow.definitions, tasks: Airflow::Task.definitions)
  @workflows = workflows
  @tasks = tasks
  @reactor = Reactor.instance
  @sig_handler = SigHandler.new(self)
  @mutex = Mutex.new
  @state = State.bg_state
  @workflow_runner = Runner::WorkflowRunner.new(workflows)
  @task_runner = Runner::TaskRunner.new(tasks)

  Process.setproctitle("ruby-flow-worker")
end

Instance Method Details

#start(wait_interrupt: true) ⇒ Object



29
30
31
32
33
34
35
36
37
38
# File 'lib/async_flow/worker.rb', line 29

def start(wait_interrupt: true)
  running!
  reactor.start
  reactor.schedule do |parent|
    parent.schedule { |subtask| workflow_runner.run(subtask) }
    parent.schedule { |subtask| task_runner.run(subtask) }
  end

  sig_handler.call { logger.info "Flow Worker started. Use Ctrl-C to stop" } if wait_interrupt
end

#stopObject



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/async_flow/worker.rb', line 40

def stop
  logger.info "stopping worker"
  mutex.synchronize { state.stop! }
  task_runner.stop
  workflow_runner.stop
  # reactor.stop
  # raise(Error.new("Stopping"))
  # TODO
  # launcher.stop
  # task_execution&.drain
  # workflow_execution&.drain

  logger.info "worker stopped"
end