Class: Airflow::Workflow::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/async_flow/workflow.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(task) ⇒ Base

Returns a new instance of Base.



28
29
30
# File 'lib/async_flow/workflow.rb', line 28

def initialize(task)
  @task = task
end

Class Method Details

._typeObject



15
16
17
# File 'lib/async_flow/workflow.rb', line 15

def self._type
  @_type || name
end

.inherited(subclass) ⇒ Object



19
20
21
22
# File 'lib/async_flow/workflow.rb', line 19

def self.inherited(subclass)
  Airflow::Workflow.definitions << subclass
  super
end

.workflow_name(name) ⇒ Object



11
12
13
# File 'lib/async_flow/workflow.rb', line 11

def self.workflow_name(name)
  @_type = name
end

Instance Method Details

#executeObject

Raises:

  • (NotImplementedError)


24
25
26
# File 'lib/async_flow/workflow.rb', line 24

def execute
  raise NotImplementedError, "abstract method"
end

#execute_task(task_name) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/async_flow/workflow.rb', line 32

def execute_task(task_name)
  Future.new do
    run = Persistence.workflow_runs.find_run_by_task_id(@task.id)
    task = run.start_task(task_name: task_name, is_workflow_task: false)
    Persistence.workflow_runs.save!(run)
    until task.completed?
      sleep(1) # TODO: Notify instead of polling from DB
      task = Persistence.workflow_runs.find_task(task.id)
    end
    task.result
  end
end

#workflowObject



45
46
47
# File 'lib/async_flow/workflow.rb', line 45

def workflow
  self
end