Class: Temporalio::Worker::ActivityWorker Private

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/worker/activity_worker.rb

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Instance Method Summary collapse

Constructor Details

#initialize(task_queue, worker, activities, converter, interceptors, executor, graceful_timeout) ⇒ ActivityWorker

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of ActivityWorker.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/temporalio/worker/activity_worker.rb', line 11

def initialize(
  task_queue,
  worker,
  activities,
  converter,
  interceptors,
  executor,
  graceful_timeout
)
  @task_queue = task_queue
  @worker = worker
  @activities = prepare_activities(activities)
  @converter = converter
  @inbound_interceptors = Temporalio::Interceptor::Chain.new(
    Temporalio::Interceptor.filter(interceptors, :activity_inbound),
  )
  @outbound_interceptors = Temporalio::Interceptor::Chain.new(
    Temporalio::Interceptor.filter(interceptors, :activity_outbound),
  )
  @executor = executor
  @graceful_timeout = graceful_timeout
  @running_activities = {}
  @cancellations = []
  @drain_queue = Queue.new
end

Instance Method Details

#drainObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



74
75
76
# File 'lib/temporalio/worker/activity_worker.rb', line 74

def drain
  drain_queue.pop
end

#run(reactor) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/temporalio/worker/activity_worker.rb', line 37

def run(reactor)
  # @type var outstanding_tasks: Array[Async::Task]
  outstanding_tasks = []

  loop do
    activity_task = worker.poll_activity_task
    outstanding_tasks << reactor.async do |async_task|
      if activity_task.start
        handle_start_activity(activity_task.task_token, activity_task.start)
      elsif activity_task.cancel
        handle_cancel_activity(activity_task.task_token, activity_task.cancel)
      end
    ensure
      outstanding_tasks.delete(async_task)
    end
  end
rescue Temporalio::Bridge::Error::WorkerShutdown
  # No need to re-raise this error, it's a part of a normal shutdown
ensure
  outstanding_tasks.each(&:wait)
  @cancelation_task&.wait
  drain_queue.close
end

#setup_graceful_shutdown_timer(reactor) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/temporalio/worker/activity_worker.rb', line 61

def setup_graceful_shutdown_timer(reactor)
  if graceful_timeout
    reactor.async do |async_task|
      @cancelation_task = async_task.async do
        sleep graceful_timeout
        @running_activities.each_value do |activity_runner|
          activity_runner.cancel('Worker is shutting down', by_request: false)
        end
      end
    end
  end
end