Class: Sbmt::Outbox::V2::Worker
- Inherits:
-
Object
- Object
- Sbmt::Outbox::V2::Worker
- Defined in:
- lib/sbmt/outbox/v2/worker.rb
Instance Method Summary collapse
- #alive? ⇒ Boolean
-
#initialize(boxes:, poll_tactic: nil, processor_concurrency: nil, poller_partitions_count: nil, poller_threads_count: nil) ⇒ Worker
constructor
A new instance of Worker.
- #ready? ⇒ Boolean
- #start ⇒ Object
- #start_async ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(boxes:, poll_tactic: nil, processor_concurrency: nil, poller_partitions_count: nil, poller_threads_count: nil) ⇒ Worker
Returns a new instance of Worker.
11 12 13 14 |
# File 'lib/sbmt/outbox/v2/worker.rb', line 11 def initialize(boxes:, poll_tactic: nil, processor_concurrency: nil, poller_partitions_count: nil, poller_threads_count: nil) @poller = Poller.new(boxes, throttler_tactic: poll_tactic, threads_count: poller_threads_count, partitions_count: poller_partitions_count) @processor = Processor.new(boxes, threads_count: processor_concurrency) end |
Instance Method Details
#alive? ⇒ Boolean
44 45 46 47 48 |
# File 'lib/sbmt/outbox/v2/worker.rb', line 44 def alive? return false unless ready? @poller.alive?(@poller.lock_timeout) && @processor.alive?(@processor.lock_timeout) end |
#ready? ⇒ Boolean
40 41 42 |
# File 'lib/sbmt/outbox/v2/worker.rb', line 40 def ready? @poller.ready? && @processor.ready? end |
#start ⇒ Object
16 17 18 19 20 21 22 23 |
# File 'lib/sbmt/outbox/v2/worker.rb', line 16 def start start_async loop do sleep 0.1 break unless @poller.started && @processor.started end end |
#start_async ⇒ Object
25 26 27 28 29 30 31 32 33 |
# File 'lib/sbmt/outbox/v2/worker.rb', line 25 def start_async @poller.start @processor.start loop do sleep(0.1) break if ready? end end |
#stop ⇒ Object
35 36 37 38 |
# File 'lib/sbmt/outbox/v2/worker.rb', line 35 def stop @poller.stop @processor.stop end |