Class: Sbmt::Outbox::V1::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/sbmt/outbox/v1/thread_pool.rb

Constant Summary collapse

BREAK =
Object.new.freeze
SKIPPED =
Object.new.freeze
PROCESSED =
Object.new.freeze

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ ThreadPool

Returns a new instance of ThreadPool.



13
14
15
16
17
# File 'lib/sbmt/outbox/v1/thread_pool.rb', line 13

def initialize(&block)
  self.task_source = block
  self.task_mutex = Mutex.new
  self.stopped = true
end

Instance Method Details

#next_taskObject



19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/sbmt/outbox/v1/thread_pool.rb', line 19

def next_task
  task_mutex.synchronize do
    return if stopped
    item = task_source.call

    if item == BREAK
      self.stopped = true
      return
    end

    item
  end
end

#start(concurrency:) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
# File 'lib/sbmt/outbox/v1/thread_pool.rb', line 33

def start(concurrency:)
  self.stopped = false
  result = run_threads(count: concurrency) do |item|
    yield worker_number, item
  end

  raise result if result.is_a?(Exception)
  nil
ensure
  self.stopped = true
end

#stopObject



45
46
47
# File 'lib/sbmt/outbox/v1/thread_pool.rb', line 45

def stop
  self.stopped = true
end

#worker_numberObject



49
50
51
# File 'lib/sbmt/outbox/v1/thread_pool.rb', line 49

def worker_number
  Thread.current["thread_pool_worker_number:#{object_id}"]
end