Class: Temporalio::Worker::WorkflowReplayer
- Inherits:
-
Object
- Object
- Temporalio::Worker::WorkflowReplayer
- Defined in:
- lib/temporalio/worker/workflow_replayer.rb
Overview
Replayer to replay workflows from existing history.
Defined Under Namespace
Classes: Options, ReplayResult, ReplayWorker
Instance Attribute Summary collapse
-
#options ⇒ Options
readonly
Options for this replayer which has the same attributes as #initialize.
Instance Method Summary collapse
-
#initialize(workflows:, namespace: 'ReplayNamespace', task_queue: 'ReplayTaskQueue', data_converter: Converters::DataConverter.default, workflow_executor: WorkflowExecutor::ThreadPool.default, plugins: [], interceptors: [], identity: nil, logger: Logger.new($stdout, level: Logger::WARN), illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, unsafe_workflow_io_enabled: false, debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase), runtime: Runtime.default) { ... } ⇒ WorkflowReplayer
constructor
Create a new replayer.
-
#replay_workflow(history, raise_on_replay_failure: true) ⇒ ReplayResult
Replay a workflow history.
-
#replay_workflows(histories, raise_on_replay_failure: false) ⇒ Array<ReplayResult>
Replay multiple workflow histories.
-
#with_replay_worker {|Worker| ... } ⇒ Object
Run a block of code with a ReplayWorker to execute replays.
Constructor Details
#initialize(workflows:, namespace: 'ReplayNamespace', task_queue: 'ReplayTaskQueue', data_converter: Converters::DataConverter.default, workflow_executor: WorkflowExecutor::ThreadPool.default, plugins: [], interceptors: [], identity: nil, logger: Logger.new($stdout, level: Logger::WARN), illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, unsafe_workflow_io_enabled: false, debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase), runtime: Runtime.default) { ... } ⇒ WorkflowReplayer
Create a new replayer. This combines some options from both Temporalio::Worker#initialize and Client#initialize.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/temporalio/worker/workflow_replayer.rb', line 84 def initialize( workflows:, namespace: 'ReplayNamespace', task_queue: 'ReplayTaskQueue', data_converter: Converters::DataConverter.default, workflow_executor: WorkflowExecutor::ThreadPool.default, plugins: [], interceptors: [], identity: nil, logger: Logger.new($stdout, level: Logger::WARN), illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, unsafe_workflow_io_enabled: false, debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase), runtime: Runtime.default, & ) = Options.new( workflows:, namespace:, task_queue:, data_converter:, workflow_executor:, plugins:, interceptors:, identity:, logger:, illegal_workflow_calls:, workflow_failure_exception_types:, workflow_payload_codec_thread_pool:, unsafe_workflow_io_enabled:, debug_mode:, runtime: ).freeze # Apply plugins Worker._validate_plugins!(plugins) = plugins.reduce() { |, plugin| plugin.configure_workflow_replayer() } # Preload definitions and other settings @workflow_definitions = Internal::Worker::WorkflowWorker.workflow_definitions( .workflows, should_enforce_versioning_behavior: false ) @nondeterminism_as_workflow_fail, @nondeterminism_as_workflow_fail_for_types = Internal::Worker::WorkflowWorker.( workflow_failure_exception_types: .workflow_failure_exception_types, workflow_definitions: @workflow_definitions ) # If there is a block, we'll go ahead and assume it's for with_replay_worker with_replay_worker(&) if block_given? # steep:ignore end |
Instance Attribute Details
#options ⇒ Options (readonly)
Returns Options for this replayer which has the same attributes as #initialize.
44 45 46 |
# File 'lib/temporalio/worker/workflow_replayer.rb', line 44 def end |
Instance Method Details
#replay_workflow(history, raise_on_replay_failure: true) ⇒ ReplayResult
Replay a workflow history.
If doing multiple histories, it is better to use #replay_workflows or #with_replay_worker since they create a replay worker just once instead of each time like this call does.
146 147 148 |
# File 'lib/temporalio/worker/workflow_replayer.rb', line 146 def replay_workflow(history, raise_on_replay_failure: true) with_replay_worker { |worker| worker.replay_workflow(history, raise_on_replay_failure:) } end |
#replay_workflows(histories, raise_on_replay_failure: false) ⇒ Array<ReplayResult>
Replay multiple workflow histories.
157 158 159 160 161 |
# File 'lib/temporalio/worker/workflow_replayer.rb', line 157 def replay_workflows(histories, raise_on_replay_failure: false) with_replay_worker do |worker| histories.map { |h| worker.replay_workflow(h, raise_on_replay_failure:) } end end |
#with_replay_worker {|Worker| ... } ⇒ Object
Run a block of code with a ReplayWorker to execute replays.
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/temporalio/worker/workflow_replayer.rb', line 168 def with_replay_worker(&block) # Apply plugins run_block = proc do || # @type var options: Plugin::WithWorkflowReplayWorkerOptions block.call(.worker) end run_block = .plugins.reverse_each.reduce(run_block) do |next_call, plugin| proc do || plugin.with_workflow_replay_worker(, next_call) # steep:ignore end end worker = ReplayWorker.new( options:, workflow_definitions: @workflow_definitions, nondeterminism_as_workflow_fail: @nondeterminism_as_workflow_fail, nondeterminism_as_workflow_fail_for_types: @nondeterminism_as_workflow_fail_for_types ) begin run_block.call(Plugin::WithWorkflowReplayWorkerOptions.new(worker:)) ensure worker._shutdown end end |