Class: Messaging::ConsumerSupervisor
- Inherits:
-
Object
- Object
- Messaging::ConsumerSupervisor
show all
- Includes:
- Instrumentation
- Defined in:
- lib/messaging/consumer_supervisor.rb
Constant Summary
Instrumentation::NAMESPACE
Instance Method Summary
collapse
#instrument, subscribe, unsubscribe
Constructor Details
Returns a new instance of ConsumerSupervisor.
6
7
8
9
|
# File 'lib/messaging/consumer_supervisor.rb', line 6
def initialize
@threads = Concurrent::Array.new
Messaging.routes.define_consumers!
end
|
Instance Method Details
#consumers ⇒ Object
47
48
49
|
# File 'lib/messaging/consumer_supervisor.rb', line 47
def consumers
Messaging.routes.consumers
end
|
#start ⇒ Object
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# File 'lib/messaging/consumer_supervisor.rb', line 11
def start
Concurrent.use_simple_logger
Messaging.logger.info 'Consumers starting'
@signal_to_stop = false
@threads.clear
@thread_pool = Concurrent::FixedThreadPool.new(consumers.size, auto_terminate: false)
consumers.each do |consumer|
@thread_pool.post do
thread = Thread.current
thread.abort_on_exception = true
@threads << thread
run_consumer(consumer)
end
end
true
end
|
#status ⇒ Object
43
44
45
|
# File 'lib/messaging/consumer_supervisor.rb', line 43
def status
consumers.map(&:log_current_status)
end
|
#stop ⇒ Object
30
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/messaging/consumer_supervisor.rb', line 30
def stop
return if @signal_to_stop
instrument('consumer_supervisor.stop') do
@signal_to_stop = true
consumers.map { |consumer| Thread.new { consumer.stop } }.join
@threads.select(&:alive?).each { |thread| thread&.wakeup }
@thread_pool&.shutdown
@thread_pool&.wait_for_termination(60)
Messaging.logger.info 'Consumers stopped'
end
end
|