Class: Temporalio::Worker::ActivityWorker Private
- Inherits:
-
Object
- Object
- Temporalio::Worker::ActivityWorker
- Defined in:
- lib/temporalio/worker/activity_worker.rb
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Instance Method Summary collapse
- #drain ⇒ Object private
-
#initialize(task_queue, worker, activities, converter, interceptors, executor, graceful_timeout) ⇒ ActivityWorker
constructor
private
A new instance of ActivityWorker.
- #run(reactor) ⇒ Object private
- #setup_graceful_shutdown_timer(reactor) ⇒ Object private
Constructor Details
#initialize(task_queue, worker, activities, converter, interceptors, executor, graceful_timeout) ⇒ ActivityWorker
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of ActivityWorker.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/temporalio/worker/activity_worker.rb', line 11 def initialize( task_queue, worker, activities, converter, interceptors, executor, graceful_timeout ) @task_queue = task_queue @worker = worker @activities = prepare_activities(activities) @converter = converter @inbound_interceptors = Temporalio::Interceptor::Chain.new( Temporalio::Interceptor.filter(interceptors, :activity_inbound), ) @outbound_interceptors = Temporalio::Interceptor::Chain.new( Temporalio::Interceptor.filter(interceptors, :activity_outbound), ) @executor = executor @graceful_timeout = graceful_timeout @running_activities = {} @cancellations = [] @drain_queue = Queue.new end |
Instance Method Details
#drain ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
74 75 76 |
# File 'lib/temporalio/worker/activity_worker.rb', line 74 def drain drain_queue.pop end |
#run(reactor) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/temporalio/worker/activity_worker.rb', line 37 def run(reactor) # @type var outstanding_tasks: Array[Async::Task] outstanding_tasks = [] loop do activity_task = worker.poll_activity_task outstanding_tasks << reactor.async do |async_task| if activity_task.start handle_start_activity(activity_task.task_token, activity_task.start) elsif activity_task.cancel handle_cancel_activity(activity_task.task_token, activity_task.cancel) end ensure outstanding_tasks.delete(async_task) end end rescue Temporalio::Bridge::Error::WorkerShutdown # No need to re-raise this error, it's a part of a normal shutdown ensure outstanding_tasks.each(&:wait) @cancelation_task&.wait drain_queue.close end |
#setup_graceful_shutdown_timer(reactor) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/temporalio/worker/activity_worker.rb', line 61 def setup_graceful_shutdown_timer(reactor) if graceful_timeout reactor.async do |async_task| @cancelation_task = async_task.async do sleep graceful_timeout @running_activities.each_value do |activity_runner| activity_runner.cancel('Worker is shutting down', by_request: false) end end end end end |