Class: Airflow::Persistence::WorkflowRunRepository
Instance Attribute Summary
#store
Instance Method Summary
collapse
#all, #clear, #find, #initialize
Instance Method Details
#create!(run) ⇒ Object
45
46
47
48
49
|
# File 'lib/async_flow/persistence.rb', line 45
def create!(run)
run.tasks.each { |task| task.id = next_id }
run.id = next_id
@store[run.id] = run
end
|
#find_by_workflow_id(id) ⇒ Object
63
64
65
|
# File 'lib/async_flow/persistence.rb', line 63
def find_by_workflow_id(id)
@store.values.find { |run| run.workflow_id == id } || raise(EntityNotFound)
end
|
#find_run_by_task_id(task_id) ⇒ Object
55
56
57
|
# File 'lib/async_flow/persistence.rb', line 55
def find_run_by_task_id(task_id)
@store.values.find { |run| run.tasks.find { |task| task.id == task_id } }
end
|
#find_task(task_id) ⇒ Object
59
60
61
|
# File 'lib/async_flow/persistence.rb', line 59
def find_task(task_id)
@store.values.flat_map(&:tasks).find { |record| record.id == task_id } || raise(EntityNotFound)
end
|
#next_task(is_workflow_task:) ⇒ Object
67
68
69
70
71
72
73
|
# File 'lib/async_flow/persistence.rb', line 67
def next_task(is_workflow_task:)
task = @store.values.flat_map(&:tasks).find do |record|
record.is_workflow_task == is_workflow_task && record.status == "scheduled"
end
task.status = "TODO" if task
task
end
|
#save!(run) ⇒ Object
51
52
53
|
# File 'lib/async_flow/persistence.rb', line 51
def save!(run)
run.tasks.each { |task| task.id = next_id if task.id.nil? }
end
|