Class: Catena::Scheduler

Inherits:
Object
  • Object
show all
Includes:
Lang, Sidekiq::Worker
Defined in:
lib/catena/scheduler.rb

Constant Summary collapse

MAX_STEPS =
10

Instance Method Summary collapse

Methods included from Lang

#bind, callback_to_func_name, #failure, func_name_to_callback, included, #succeed

Instance Method Details

#perform(efx, stack) ⇒ Object



11
12
13
# File 'lib/catena/scheduler.rb', line 11

def perform(efx, stack)
  step(efx, stack, 0)
end

#step(efx, stack, steps) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/catena/scheduler.rb', line 15

def step(efx, stack, steps)
  if (steps > MAX_STEPS)
    logger.warn "Exceeded MAX STEPS. Stowing efx #{efx}"
    enqueue(efx, stack)
    steps
  else
    # TODO if not use >= after pass, will have efx["type"] is nil, need error message
    # TODO if has trailing "|" in chain, it'll subsume the evaluator call, and return nil
    # TODO also need error message when argument length mismatches
    logger.debug "EFX: #{efx.inspect}"
    send("step_#{efx["type"]}", efx, stack, steps)
  end
end

#step_and_then(efx, stack, steps) ⇒ Object



83
84
85
86
87
88
89
# File 'lib/catena/scheduler.rb', line 83

def step_and_then(efx, stack, steps)
  new_stack = stack.push(efx)
  logger.info "Processing and_then. Stack.len = #{new_stack.length}"

  raise "step_and_then new_efx is nil" if efx["side_effect"].nil?
  step(efx["side_effect"], new_stack, steps + 1)
end

#step_binding(efx, stack, steps) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/catena/scheduler.rb', line 65

def step_binding(efx, stack, steps)
  # TODO canceling the entire process should happen at binding
  logger.info "Processing binding of #{efx["callback_name"]}"

  # FIXME shouldn't need to know explicitly the tasks are on Deployment class

  callback = find_callback(efx["callback_name"])
  args = efx["callback_args"] + [evaluator(stack)]
  logger.debug "  Calling '#{efx["callback_name"]}' with args: #{args.inspect}"

  # FIXME check the arity and note if we're short?
  # if we're at the end, and we're short on arguments, it'll happyly execute,
  # return a lambda, and silently finish
  callback.call(*args)

  return steps + 1
end

#step_failure(efx, stack, steps) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/catena/scheduler.rb', line 47

def step_failure(efx, stack, steps)
  logger.info "Processing failure: #{efx}. Stack.len = #{stack.length}"
  new_stack = flush("and_then", stack)
  #puts "  Flushed and_then. Stack.len = #{new_stack.length}"

  if new_stack.empty?
    return steps
  else
    on_error_node = new_stack.pop()
    callback_node = on_error_node["binding_callback"]
    logger.debug "  Popped #{callback_node}. Stack.len = #{new_stack.length}"

    new_efx = chain(callback_node, efx["error"])
    raise "step_failure new_efx is nil. failure value: #{efx["error"]}" if new_efx.nil?
    step(new_efx, new_stack, steps + 1)
  end
end

#step_on_error(efx, stack, steps) ⇒ Object



91
92
93
94
# File 'lib/catena/scheduler.rb', line 91

def step_on_error(efx, stack, steps)
  logger.warn "Processing on_error...not implemented"
  return steps
end

#step_pmap(efx, stack, steps) ⇒ Object



96
97
98
99
# File 'lib/catena/scheduler.rb', line 96

def step_pmap(efx, stack, steps)
  logger.warn "Processing parallel map...not implemented"
  return steps
end

#step_succeed(efx, stack, steps) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/catena/scheduler.rb', line 29

def step_succeed(efx, stack, steps)
  logger.debug "Processing succeed: #{efx}. Stack.len = #{stack.length}"
  new_stack = flush("on_error", stack)
  logger.debug "  Flushed on_error. Stack.len = #{new_stack.length}"

  if new_stack.empty?
    return steps
  else
    and_then_node = new_stack.pop()
    callback_node = and_then_node["binding_callback"] # the callback node is type binding
    logger.debug "  Popped #{callback_node}. Stack.len = #{new_stack.length}"

    new_efx = chain(callback_node, efx["value"])
    raise "step_succeed new_efx is nil. succeed value: #{efx["value"]}" if new_efx.nil?
    step(new_efx, new_stack, steps + 1)
  end
end