Class: Catena::Scheduler
- Inherits:
-
Object
show all
- Includes:
- Lang, Sidekiq::Worker
- Defined in:
- lib/catena/scheduler.rb
Constant Summary
collapse
- MAX_STEPS =
10
Instance Method Summary
collapse
-
#perform(efx, stack) ⇒ Object
-
#step(efx, stack, steps) ⇒ Object
-
#step_and_then(efx, stack, steps) ⇒ Object
-
#step_binding(efx, stack, steps) ⇒ Object
-
#step_failure(efx, stack, steps) ⇒ Object
-
#step_on_error(efx, stack, steps) ⇒ Object
-
#step_pmap(efx, stack, steps) ⇒ Object
-
#step_succeed(efx, stack, steps) ⇒ Object
Methods included from Lang
#bind, callback_to_func_name, #failure, func_name_to_callback, included, #succeed
Instance Method Details
11
12
13
|
# File 'lib/catena/scheduler.rb', line 11
def perform(efx, stack)
step(efx, stack, 0)
end
|
#step(efx, stack, steps) ⇒ Object
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# File 'lib/catena/scheduler.rb', line 15
def step(efx, stack, steps)
if (steps > MAX_STEPS)
logger.warn "Exceeded MAX STEPS. Stowing efx #{efx}"
enqueue(efx, stack)
steps
else
logger.debug "EFX: #{efx.inspect}"
send("step_#{efx["type"]}", efx, stack, steps)
end
end
|
#step_and_then(efx, stack, steps) ⇒ Object
83
84
85
86
87
88
89
|
# File 'lib/catena/scheduler.rb', line 83
def step_and_then(efx, stack, steps)
new_stack = stack.push(efx)
logger.info "Processing and_then. Stack.len = #{new_stack.length}"
raise "step_and_then new_efx is nil" if efx["side_effect"].nil?
step(efx["side_effect"], new_stack, steps + 1)
end
|
#step_binding(efx, stack, steps) ⇒ Object
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
# File 'lib/catena/scheduler.rb', line 65
def step_binding(efx, stack, steps)
logger.info "Processing binding of #{efx["callback_name"]}"
callback = find_callback(efx["callback_name"])
args = efx["callback_args"] + [evaluator(stack)]
logger.debug " Calling '#{efx["callback_name"]}' with args: #{args.inspect}"
callback.call(*args)
return steps + 1
end
|
#step_failure(efx, stack, steps) ⇒ Object
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/catena/scheduler.rb', line 47
def step_failure(efx, stack, steps)
logger.info "Processing failure: #{efx}. Stack.len = #{stack.length}"
new_stack = flush("and_then", stack)
if new_stack.empty?
return steps
else
on_error_node = new_stack.pop()
callback_node = on_error_node["binding_callback"]
logger.debug " Popped #{callback_node}. Stack.len = #{new_stack.length}"
new_efx = chain(callback_node, efx["error"])
raise "step_failure new_efx is nil. failure value: #{efx["error"]}" if new_efx.nil?
step(new_efx, new_stack, steps + 1)
end
end
|
#step_on_error(efx, stack, steps) ⇒ Object
91
92
93
94
|
# File 'lib/catena/scheduler.rb', line 91
def step_on_error(efx, stack, steps)
logger.warn "Processing on_error...not implemented"
return steps
end
|
#step_pmap(efx, stack, steps) ⇒ Object
96
97
98
99
|
# File 'lib/catena/scheduler.rb', line 96
def step_pmap(efx, stack, steps)
logger.warn "Processing parallel map...not implemented"
return steps
end
|
#step_succeed(efx, stack, steps) ⇒ Object
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/catena/scheduler.rb', line 29
def step_succeed(efx, stack, steps)
logger.debug "Processing succeed: #{efx}. Stack.len = #{stack.length}"
new_stack = flush("on_error", stack)
logger.debug " Flushed on_error. Stack.len = #{new_stack.length}"
if new_stack.empty?
return steps
else
and_then_node = new_stack.pop()
callback_node = and_then_node["binding_callback"] logger.debug " Popped #{callback_node}. Stack.len = #{new_stack.length}"
new_efx = chain(callback_node, efx["value"])
raise "step_succeed new_efx is nil. succeed value: #{efx["value"]}" if new_efx.nil?
step(new_efx, new_stack, steps + 1)
end
end
|