Class: Sbmt::Outbox::V2::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/sbmt/outbox/v2/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(boxes:, poll_tactic: nil, processor_concurrency: nil, poller_partitions_count: nil, poller_threads_count: nil) ⇒ Worker

Returns a new instance of Worker.



11
12
13
14
# File 'lib/sbmt/outbox/v2/worker.rb', line 11

def initialize(boxes:, poll_tactic: nil, processor_concurrency: nil, poller_partitions_count: nil, poller_threads_count: nil)
  @poller = Poller.new(boxes, throttler_tactic: poll_tactic, threads_count: poller_threads_count, partitions_count: poller_partitions_count)
  @processor = Processor.new(boxes, threads_count: processor_concurrency)
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


44
45
46
47
48
# File 'lib/sbmt/outbox/v2/worker.rb', line 44

def alive?
  return false unless ready?

  @poller.alive?(@poller.lock_timeout) && @processor.alive?(@processor.lock_timeout)
end

#ready?Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/sbmt/outbox/v2/worker.rb', line 40

def ready?
  @poller.ready? && @processor.ready?
end

#startObject



16
17
18
19
20
21
22
23
# File 'lib/sbmt/outbox/v2/worker.rb', line 16

def start
  start_async

  loop do
    sleep 0.1
    break unless @poller.started && @processor.started
  end
end

#start_asyncObject



25
26
27
28
29
30
31
32
33
# File 'lib/sbmt/outbox/v2/worker.rb', line 25

def start_async
  @poller.start
  @processor.start

  loop do
    sleep(0.1)
    break if ready?
  end
end

#stopObject



35
36
37
38
# File 'lib/sbmt/outbox/v2/worker.rb', line 35

def stop
  @poller.stop
  @processor.stop
end