Class: Temporal::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/temporal/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size], workflow_thread_pool_size: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:thread_pool_size]) ⇒ Worker

activity_thread_pool_size: number of threads that the poller can use to run activities.

can be set to 1 if you want no paralellism in your activities, at the cost of throughput.


12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/temporal/worker.rb', line 12

def initialize(
  activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size],
  workflow_thread_pool_size: Temporal::Workflow::Poller::DEFAULT_OPTIONS[:thread_pool_size]
)
  @workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new }
  @activities = Hash.new { |hash, key| hash[key] = ExecutableLookup.new }
  @pollers = []
  @workflow_task_middleware = []
  @activity_middleware = []
  @shutting_down = false
  @activity_poller_options = {
    thread_pool_size: activity_thread_pool_size,
  }
  @workflow_poller_options = {
    thread_pool_size: workflow_thread_pool_size,
  }
end

Instance Method Details

#add_activity_middleware(middleware_class, *args) ⇒ Object



48
49
50
# File 'lib/temporal/worker.rb', line 48

def add_activity_middleware(middleware_class, *args)
  @activity_middleware << Middleware::Entry.new(middleware_class, args)
end

#add_workflow_task_middleware(middleware_class, *args) ⇒ Object



44
45
46
# File 'lib/temporal/worker.rb', line 44

def add_workflow_task_middleware(middleware_class, *args)
  @workflow_task_middleware << Middleware::Entry.new(middleware_class, args)
end

#register_activity(activity_class, options = {}) ⇒ Object



37
38
39
40
41
42
# File 'lib/temporal/worker.rb', line 37

def register_activity(activity_class, options = {})
  execution_options = ExecutionOptions.new(activity_class, options)
  key = [execution_options.namespace, execution_options.task_queue]

  @activities[key].add(execution_options.name, activity_class)
end

#register_workflow(workflow_class, options = {}) ⇒ Object



30
31
32
33
34
35
# File 'lib/temporal/worker.rb', line 30

def register_workflow(workflow_class, options = {})
  execution_options = ExecutionOptions.new(workflow_class, options)
  key = [execution_options.namespace, execution_options.task_queue]

  @workflows[key].add(execution_options.name, workflow_class)
end

#startObject



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/temporal/worker.rb', line 52

def start
  workflows.each_pair do |(namespace, task_queue), lookup|
    pollers << workflow_poller_for(namespace, task_queue, lookup)
  end

  activities.each_pair do |(namespace, task_queue), lookup|
    pollers << activity_poller_for(namespace, task_queue, lookup)
  end

  trap_signals

  pollers.each(&:start)

  # keep the main thread alive
  sleep 1 while !shutting_down?
end

#stopObject



69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/temporal/worker.rb', line 69

def stop
  @shutting_down = true

  Thread.new do
    pollers.each(&:stop_polling)
    # allow workers to drain in-transit tasks.
    # https://github.com/temporalio/temporal/issues/1058
    sleep 1
    pollers.each(&:cancel_pending_requests)
    pollers.each(&:wait)
  end.join
end