Class: Temporalio::Worker::ActivityWorker Private

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/worker/activity_worker.rb

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Instance Method Summary collapse

Constructor Details

#initialize(task_queue, core_worker, activities, converter, executor, graceful_timeout) ⇒ ActivityWorker

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of ActivityWorker.



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/temporalio/worker/activity_worker.rb', line 10

def initialize(task_queue, core_worker, activities, converter, executor, graceful_timeout)
  @task_queue = task_queue
  @worker = SyncWorker.new(core_worker)
  @activities = prepare_activities(activities)
  @converter = converter
  @executor = executor
  @graceful_timeout = graceful_timeout
  @running_activities = {}
  @cancellations = []
  @drain_queue = Queue.new
end

Instance Method Details

#drainObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



58
59
60
# File 'lib/temporalio/worker/activity_worker.rb', line 58

def drain
  drain_queue.pop
end

#run(reactor) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/temporalio/worker/activity_worker.rb', line 22

def run(reactor)
  # @type var outstanding_tasks: Array[Async::Task]
  outstanding_tasks = []

  loop do
    activity_task = worker.poll_activity_task
    outstanding_tasks << reactor.async do |async_task|
      if activity_task.start
        handle_start_activity(activity_task.task_token, activity_task.start)
      elsif activity_task.cancel
        handle_cancel_activity(activity_task.task_token, activity_task.cancel)
      end
    ensure
      outstanding_tasks.delete(async_task)
    end
  end
rescue Temporalio::Bridge::Error::WorkerShutdown
  # No need to re-raise this error, it's a part of a normal shutdown
ensure
  reactor.async do |async_task|
    cancelation_task =
      if graceful_timeout
        async_task.async do
          sleep graceful_timeout
          running_activities.each_value do |activity_runner|
            activity_runner.cancel('Worker is shutting down', by_request: false)
          end
        end
      end

    outstanding_tasks.each(&:wait)
    cancelation_task&.stop # all tasks completed, stop cancellations
    drain_queue.close
  end
end