Class: Airflow::Persistence::WorkflowRunRepository

Inherits:
InMemoryRepository show all
Defined in:
lib/async_flow/persistence.rb

Instance Attribute Summary

Attributes inherited from InMemoryRepository

#store

Instance Method Summary collapse

Methods inherited from InMemoryRepository

#all, #clear, #find, #initialize

Constructor Details

This class inherits a constructor from Airflow::Persistence::InMemoryRepository

Instance Method Details

#create!(run) ⇒ Object



45
46
47
48
49
# File 'lib/async_flow/persistence.rb', line 45

def create!(run)
  run.tasks.each { |task| task.id = next_id }
  run.id = next_id
  @store[run.id] = run
end

#find_by_workflow_id(id) ⇒ Object



63
64
65
# File 'lib/async_flow/persistence.rb', line 63

def find_by_workflow_id(id)
  @store.values.find { |run| run.workflow_id == id } || raise(EntityNotFound)
end

#find_run_by_task_id(task_id) ⇒ Object



55
56
57
# File 'lib/async_flow/persistence.rb', line 55

def find_run_by_task_id(task_id)
  @store.values.find { |run| run.tasks.find { |task| task.id == task_id } }
end

#find_task(task_id) ⇒ Object



59
60
61
# File 'lib/async_flow/persistence.rb', line 59

def find_task(task_id)
  @store.values.flat_map(&:tasks).find { |record| record.id == task_id } || raise(EntityNotFound)
end

#next_task(is_workflow_task:) ⇒ Object



67
68
69
70
71
72
73
# File 'lib/async_flow/persistence.rb', line 67

def next_task(is_workflow_task:)
  task = @store.values.flat_map(&:tasks).find do |record|
    record.is_workflow_task == is_workflow_task && record.status == "scheduled"
  end
  task.status = "TODO" if task
  task
end

#save!(run) ⇒ Object



51
52
53
# File 'lib/async_flow/persistence.rb', line 51

def save!(run)
  run.tasks.each { |task| task.id = next_id if task.id.nil? }
end