Class: Airflow::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/async_flow/client.rb

Defined Under Namespace

Classes: ResultWrapper

Instance Method Summary collapse

Instance Method Details

#result(workflow_id) ⇒ Object



42
43
44
45
46
47
48
49
# File 'lib/async_flow/client.rb', line 42

def result(workflow_id)
  run = Persistence.workflow_runs.find_by_workflow_id(workflow_id)
  until run.completed?
    sleep(1) # TODO: Notify instead of polling from DB
    run = Persistence.workflow_runs.find_by_workflow_id(workflow_id)
  end
  run.result
end

#start_workflow(workflow_name_or_class, workflow_id:) ⇒ Object



29
30
31
32
33
34
35
# File 'lib/async_flow/client.rb', line 29

def start_workflow(workflow_name_or_class, workflow_id:)
  workflow_name = Workflow.definitions.get(workflow_name_or_class)._type
  cmd = Commands::StartWorkflow.new(workflow_name: workflow_name, workflow_id: workflow_id, queue: "default")
  Airflow.mediator.accept(cmd).then do
    ResultWrapper.new(self, workflow_id)
  end
end

#workflow_info(workflow_id) ⇒ Object



37
38
39
40
# File 'lib/async_flow/client.rb', line 37

def workflow_info(workflow_id)
  run = Persistence.workflow_runs.find_by_workflow_id(workflow_id)
  Workflow::Info.from(run)
end