Class: Temporalio::Worker::ActivityWorker Private
- Inherits:
-
Object
- Object
- Temporalio::Worker::ActivityWorker
- Defined in:
- lib/temporalio/worker/activity_worker.rb
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Instance Method Summary collapse
- #drain ⇒ Object private
-
#initialize(task_queue, core_worker, activities, converter, executor, graceful_timeout) ⇒ ActivityWorker
constructor
private
A new instance of ActivityWorker.
- #run(reactor) ⇒ Object private
Constructor Details
#initialize(task_queue, core_worker, activities, converter, executor, graceful_timeout) ⇒ ActivityWorker
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of ActivityWorker.
10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/temporalio/worker/activity_worker.rb', line 10 def initialize(task_queue, core_worker, activities, converter, executor, graceful_timeout) @task_queue = task_queue @worker = SyncWorker.new(core_worker) @activities = prepare_activities(activities) @converter = converter @executor = executor @graceful_timeout = graceful_timeout @running_activities = {} @cancellations = [] @drain_queue = Queue.new end |
Instance Method Details
#drain ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
58 59 60 |
# File 'lib/temporalio/worker/activity_worker.rb', line 58 def drain drain_queue.pop end |
#run(reactor) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/temporalio/worker/activity_worker.rb', line 22 def run(reactor) # @type var outstanding_tasks: Array[Async::Task] outstanding_tasks = [] loop do activity_task = worker.poll_activity_task outstanding_tasks << reactor.async do |async_task| if activity_task.start handle_start_activity(activity_task.task_token, activity_task.start) elsif activity_task.cancel handle_cancel_activity(activity_task.task_token, activity_task.cancel) end ensure outstanding_tasks.delete(async_task) end end rescue Temporalio::Bridge::Error::WorkerShutdown # No need to re-raise this error, it's a part of a normal shutdown ensure reactor.async do |async_task| cancelation_task = if graceful_timeout async_task.async do sleep graceful_timeout running_activities.each_value do |activity_runner| activity_runner.cancel('Worker is shutting down', by_request: false) end end end outstanding_tasks.each(&:wait) cancelation_task&.stop # all tasks completed, stop cancellations drain_queue.close end end |