Class: Floe::Workflow::States::Map

Inherits:
Floe::Workflow::State show all
Includes:
ChildWorkflowMixin, InputOutputMixin, NonTerminalMixin, RetryCatchMixin
Defined in:
lib/floe/workflow/states/map.rb

Instance Attribute Summary collapse

Attributes inherited from Floe::Workflow::State

#comment, #name, #payload, #type

Instance Method Summary collapse

Methods included from RetryCatchMixin

#catch_error!, #fail_workflow!, #find_catcher, #find_retrier, #retry_state!

Methods included from NonTerminalMixin

#finish, #validate_state_next!

Methods included from InputOutputMixin

#process_output

Methods included from ChildWorkflowMixin

#each_child_context, #ended?, #finish, #ready?, #run_nonblock!, #running?, #wait_until, #waiting?

Methods inherited from Floe::Workflow::State

build!, #finish, #finished?, #long_name, #mark_error, #mark_finished, #mark_started, #ready?, #run_nonblock!, #running?, #short_name, #started?, #wait, #wait_until, #waiting?

Methods included from ValidationMixin

included, #invalid_field_error!, #missing_field_error!, #parser_error!, #runtime_field_error!, #workflow_state?, #wrap_parser_error

Constructor Details

#initialize(workflow, name, payload) ⇒ Map

Returns a new instance of Map.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/floe/workflow/states/map.rb', line 17

def initialize(workflow, name, payload)
  super

  missing_field_error!("InputProcessor") if payload["ItemProcessor"].nil?

  @next            = payload["Next"]
  @end             = !!payload["End"]
  @parameters      = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"]
  @input_path      = Path.new(payload.fetch("InputPath", "$"))
  @output_path     = Path.new(payload.fetch("OutputPath", "$"))
  @result_path     = ReferencePath.new(payload.fetch("ResultPath", "$"))
  @result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"]
  @retry           = payload["Retry"].to_a.map { |retrier| Retrier.new(retrier) }
  @catch           = payload["Catch"].to_a.map { |catcher| Catcher.new(catcher) }
  @item_processor  = ItemProcessor.new(payload["ItemProcessor"], name)
  @items_path      = ReferencePath.new(payload.fetch("ItemsPath", "$"))
  @item_reader     = payload["ItemReader"]
  @item_selector   = payload["ItemSelector"]
  @item_batcher    = payload["ItemBatcher"]
  @result_writer   = payload["ResultWriter"]
  @max_concurrency = payload["MaxConcurrency"]&.to_i
  @tolerated_failure_percentage = payload["ToleratedFailurePercentage"]&.to_i
  @tolerated_failure_count      = payload["ToleratedFailureCount"]&.to_i

  validate_state!(workflow)
end

Instance Attribute Details

#catchObject (readonly)

Returns the value of attribute catch.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def catch
  @catch
end

#endObject (readonly)

Returns the value of attribute end.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def end
  @end
end

#input_pathObject (readonly)

Returns the value of attribute input_path.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def input_path
  @input_path
end

#item_batcherObject (readonly)

Returns the value of attribute item_batcher.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def item_batcher
  @item_batcher
end

#item_processorObject (readonly)

Returns the value of attribute item_processor.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def item_processor
  @item_processor
end

#item_readerObject (readonly)

Returns the value of attribute item_reader.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def item_reader
  @item_reader
end

#item_selectorObject (readonly)

Returns the value of attribute item_selector.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def item_selector
  @item_selector
end

#items_pathObject (readonly)

Returns the value of attribute items_path.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def items_path
  @items_path
end

#max_concurrencyObject (readonly)

Returns the value of attribute max_concurrency.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def max_concurrency
  @max_concurrency
end

#nextObject (readonly)

Returns the value of attribute next.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def next
  @next
end

#output_pathObject (readonly)

Returns the value of attribute output_path.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def output_path
  @output_path
end

#parametersObject (readonly)

Returns the value of attribute parameters.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def parameters
  @parameters
end

#result_pathObject (readonly)

Returns the value of attribute result_path.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def result_path
  @result_path
end

#result_selectorObject (readonly)

Returns the value of attribute result_selector.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def result_selector
  @result_selector
end

#result_writerObject (readonly)

Returns the value of attribute result_writer.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def result_writer
  @result_writer
end

#retryObject (readonly)

Returns the value of attribute retry.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def retry
  @retry
end

#tolerated_failure_countObject (readonly)

Returns the value of attribute tolerated_failure_count.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def tolerated_failure_count
  @tolerated_failure_count
end

#tolerated_failure_percentageObject (readonly)

Returns the value of attribute tolerated_failure_percentage.



12
13
14
# File 'lib/floe/workflow/states/map.rb', line 12

def tolerated_failure_percentage
  @tolerated_failure_percentage
end

Instance Method Details

#end?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/floe/workflow/states/map.rb', line 57

def end?
  @end
end

#process_input(context) ⇒ Object



44
45
46
47
# File 'lib/floe/workflow/states/map.rb', line 44

def process_input(context)
  input = super
  items_path.value(context, input)
end

#start(context) ⇒ Object



49
50
51
52
53
54
55
# File 'lib/floe/workflow/states/map.rb', line 49

def start(context)
  super

  input = process_input(context)

  context.state["ItemProcessorContext"] = input.map { |item| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => item.to_json).to_h }
end

#success?(context) ⇒ Boolean

Returns:

  • (Boolean)


61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/floe/workflow/states/map.rb', line 61

def success?(context)
  contexts   = each_child_context(context)
  num_failed = contexts.count(&:failed?)
  total      = contexts.count

  return true if num_failed.zero? || total.zero?
  return false if tolerated_failure_count.nil? && tolerated_failure_percentage.nil?

  # Some have failed, check the tolerated_failure thresholds to see if
  # we should fail the whole state.
  #
  # If either ToleratedFailureCount or ToleratedFailurePercentage are breached
  # then the whole state is considered failed.
  count_tolerated = tolerated_failure_count.nil?      || num_failed < tolerated_failure_count
  pct_tolerated   = tolerated_failure_percentage.nil? || tolerated_failure_percentage == 100 ||
                    ((100 * num_failed / total.to_f) < tolerated_failure_percentage)

  count_tolerated && pct_tolerated
end