Class: Temporalio::Testing::ActivityEnvironment

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/testing/activity_environment.rb

Overview

Test environment for testing activities.

Users can create this environment and then use #run to execute activities on it. Often, since mutable things like cancellation can be set, users create this for each activity that is run. There is no real performance penalty for creating an environment for every run.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(info: ActivityEnvironment.default_info, on_heartbeat: nil, cancellation: Cancellation.new, on_cancellation_details: nil, worker_shutdown_cancellation: Cancellation.new, payload_converter: Converters::PayloadConverter.default, logger: Logger.new(nil), activity_executors: Worker::ActivityExecutor.defaults, metric_meter: nil, client: nil) ⇒ ActivityEnvironment

Create a test environment for activities.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/temporalio/testing/activity_environment.rb', line 58

def initialize(
  info: ActivityEnvironment.default_info,
  on_heartbeat: nil,
  cancellation: Cancellation.new,
  on_cancellation_details: nil,
  worker_shutdown_cancellation: Cancellation.new,
  payload_converter: Converters::PayloadConverter.default,
  logger: Logger.new(nil),
  activity_executors: Worker::ActivityExecutor.defaults,
  metric_meter: nil,
  client: nil
)
  @info = info
  @on_heartbeat = on_heartbeat
  @cancellation = cancellation
  @on_cancellation_details = on_cancellation_details || proc do
    @_cancellation_details ||= Activity::CancellationDetails.new if @cancellation.canceled?
  end
  @worker_shutdown_cancellation = worker_shutdown_cancellation
  @payload_converter = payload_converter
  @logger = logger
  @activity_executors = activity_executors
  @metric_meter = metric_meter
  @client = client
end

Class Method Details

.default_infoActivity::Info



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/temporalio/testing/activity_environment.rb', line 18

def self.default_info
  @default_info ||= Activity::Info.new(
    activity_id: 'test',
    activity_type: 'unknown',
    attempt: 1,
    current_attempt_scheduled_time: Time.at(0),
    heartbeat_timeout: nil,
    local?: false,
    priority: Temporalio::Priority.default,
    raw_heartbeat_details: [],
    retry_policy: RetryPolicy.new,
    schedule_to_close_timeout: 1.0,
    scheduled_time: Time.at(0),
    start_to_close_timeout: 1.0,
    started_time: Time.at(0),
    task_queue: 'test',
    task_token: String.new('test', encoding: Encoding::ASCII_8BIT),
    workflow_id: 'test',
    workflow_namespace: 'default',
    workflow_run_id: 'test-run',
    workflow_type: 'test'
  )
end

Instance Method Details

#run(activity, *args) ⇒ Object

Run an activity and returns its result or raises its exception.

Raises:

  • (ArgumentError)


89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/temporalio/testing/activity_environment.rb', line 89

def run(activity, *args)
  defn = Activity::Definition::Info.from_activity(activity)
  executor = @activity_executors[defn.executor]
  raise ArgumentError, "Unknown executor: #{defn.executor}" if executor.nil?

  queue = Queue.new
  executor.execute_activity(defn) do
    Activity::Context._current_executor = executor
    executor.set_activity_context(defn, Context.new(
                                          info: @info.dup,
                                          instance:
                                            defn.instance.is_a?(Proc) ? defn.instance.call : defn.instance,
                                          on_heartbeat: @on_heartbeat,
                                          cancellation: @cancellation,
                                          on_cancellation_details: @on_cancellation_details,
                                          worker_shutdown_cancellation: @worker_shutdown_cancellation,
                                          payload_converter: @payload_converter,
                                          logger: @logger,
                                          metric_meter: @metric_meter,
                                          client: @client
                                        ))
    queue.push([defn.proc.call(*args), nil])
  rescue Exception => e # rubocop:disable Lint/RescueException -- Intentionally capturing all exceptions
    queue.push([nil, e])
  ensure
    executor.set_activity_context(defn, nil)
    Activity::Context._current_executor = nil
  end

  result, err = queue.pop
  raise err unless err.nil?

  result
end