Class: Temporal::Worker
- Inherits:
-
Object
- Object
- Temporal::Worker
- Defined in:
- lib/temporal/worker.rb
Instance Method Summary collapse
- #add_activity_middleware(middleware_class, *args) ⇒ Object
- #add_workflow_task_middleware(middleware_class, *args) ⇒ Object
-
#initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size], workflow_thread_pool_size: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:thread_pool_size]) ⇒ Worker
constructor
activity_thread_pool_size: number of threads that the poller can use to run activities.
- #register_activity(activity_class, options = {}) ⇒ Object
- #register_workflow(workflow_class, options = {}) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size], workflow_thread_pool_size: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:thread_pool_size]) ⇒ Worker
activity_thread_pool_size: number of threads that the poller can use to run activities.
can be set to 1 if you want no paralellism in your activities, at the cost of throughput.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/temporal/worker.rb', line 12 def initialize( activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size], workflow_thread_pool_size: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:thread_pool_size] ) @workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @activities = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @pollers = [] @workflow_task_middleware = [] @activity_middleware = [] @shutting_down = false @activity_poller_options = { thread_pool_size: activity_thread_pool_size, } @workflow_poller_options = { thread_pool_size: workflow_thread_pool_size, } end |
Instance Method Details
#add_activity_middleware(middleware_class, *args) ⇒ Object
48 49 50 |
# File 'lib/temporal/worker.rb', line 48 def add_activity_middleware(middleware_class, *args) @activity_middleware << Middleware::Entry.new(middleware_class, args) end |
#add_workflow_task_middleware(middleware_class, *args) ⇒ Object
44 45 46 |
# File 'lib/temporal/worker.rb', line 44 def add_workflow_task_middleware(middleware_class, *args) @workflow_task_middleware << Middleware::Entry.new(middleware_class, args) end |
#register_activity(activity_class, options = {}) ⇒ Object
37 38 39 40 41 42 |
# File 'lib/temporal/worker.rb', line 37 def register_activity(activity_class, = {}) = ExecutionOptions.new(activity_class, ) key = [.namespace, .task_queue] @activities[key].add(.name, activity_class) end |
#register_workflow(workflow_class, options = {}) ⇒ Object
30 31 32 33 34 35 |
# File 'lib/temporal/worker.rb', line 30 def register_workflow(workflow_class, = {}) = ExecutionOptions.new(workflow_class, ) key = [.namespace, .task_queue] @workflows[key].add(.name, workflow_class) end |
#start ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/temporal/worker.rb', line 52 def start workflows.each_pair do |(namespace, task_queue), lookup| pollers << workflow_poller_for(namespace, task_queue, lookup) end activities.each_pair do |(namespace, task_queue), lookup| pollers << activity_poller_for(namespace, task_queue, lookup) end trap_signals pollers.each(&:start) # keep the main thread alive sleep 1 while !shutting_down? end |
#stop ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/temporal/worker.rb', line 69 def stop @shutting_down = true Thread.new do pollers.each(&:stop_polling) # allow workers to drain in-transit tasks. # https://github.com/temporalio/temporal/issues/1058 sleep 1 pollers.each(&:cancel_pending_requests) pollers.each(&:wait) end.join end |