Class: AWS::Flow::AsyncRetryingExecutor
- Inherits:
-
Object
- Object
- AWS::Flow::AsyncRetryingExecutor
- Defined in:
- lib/aws/decider/async_retrying_executor.rb
Instance Method Summary collapse
- #execute(command, options = nil) ⇒ Object
-
#initialize(retrying_policy, clock, execution_id, return_on_start = false) ⇒ AsyncRetryingExecutor
constructor
A new instance of AsyncRetryingExecutor.
- #invoke(command, attempts, first_attempt_time) ⇒ Object
- #schedule_with_retry(command, failure, attempts, first_attempt_time, time_of_recorded_failure) ⇒ Object
Constructor Details
#initialize(retrying_policy, clock, execution_id, return_on_start = false) ⇒ AsyncRetryingExecutor
Returns a new instance of AsyncRetryingExecutor.
20 21 22 23 24 25 |
# File 'lib/aws/decider/async_retrying_executor.rb', line 20 def initialize(, clock, execution_id, return_on_start = false) @retrying_policy = @clock = clock @return_on_start = return_on_start @execution_id = execution_id end |
Instance Method Details
#execute(command, options = nil) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/aws/decider/async_retrying_executor.rb', line 26 def execute(command, = nil) return schedule_with_retry(command, nil, Hash.new { |hash, key| hash[key] = 1 }, @clock.current_time, nil) if @return_on_start output = Utilities::AddressableFuture.new result_lock = Utilities::AddressableFuture.new error_handler do |t| t.begin do output.set(schedule_with_retry(command, nil, Hash.new { |hash, key| hash[key] = 1 }, @clock.current_time, nil)) end t.rescue(Exception) do |error| @error_seen = error end t.ensure do output.set unless output.set? result_lock.set end end result_lock.get raise @error_seen if @error_seen output end |
#invoke(command, attempts, first_attempt_time) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/aws/decider/async_retrying_executor.rb', line 63 def invoke(command, attempts, first_attempt_time) failure_to_retry = nil should_retry = Future.new return_value = Future.new output = Utilities::AddressableFuture.new error_handler do |t| t.begin { return_value.set(command.call) } t.rescue(Exception) do |error| failure_to_retry = error raise error if error.class <= CancellationException end t.ensure { should_retry.set(failure_to_retry) } end task do failure = should_retry.get if ! failure.nil? attempts[failure.class] += 1 output.set(schedule_with_retry(command, failure, attempts, first_attempt_time, @clock.current_time)) else output.set(return_value.get) end #to_return = return_value.set? ? return_value.get : nil end return output if @return_on_start output.get end |
#schedule_with_retry(command, failure, attempts, first_attempt_time, time_of_recorded_failure) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/aws/decider/async_retrying_executor.rb', line 47 def schedule_with_retry(command, failure, attempts, first_attempt_time, time_of_recorded_failure) delay = -1 if attempts.values.reduce(0, :+) > 1 raise failure unless @retrying_policy.isRetryable(failure) delay = @retrying_policy.next_retry_delay_seconds(first_attempt_time, time_of_recorded_failure, attempts, failure, @execution_id) raise failure if delay < 0 end if delay > 0 task do @clock.create_timer(delay, lambda { invoke(command, attempts, first_attempt_time) }) end else invoke(command, attempts, first_attempt_time) end end |