Class: Sbmt::Outbox::V2::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/sbmt/outbox/v2/thread_pool.rb

Constant Summary collapse

BREAK =
Object.new.freeze
SKIPPED =
Object.new.freeze
PROCESSED =
Object.new.freeze

Instance Method Summary collapse

Constructor Details

#initialize(concurrency:, name: "thread_pool", random_startup_delay: true, start_async: true, &block) ⇒ ThreadPool

Returns a new instance of ThreadPool.



13
14
15
16
17
18
19
20
21
22
# File 'lib/sbmt/outbox/v2/thread_pool.rb', line 13

def initialize(concurrency:, name: "thread_pool", random_startup_delay: true, start_async: true, &block)
  self.concurrency = concurrency
  self.name = name
  self.random_startup_delay = random_startup_delay
  self.start_async = start_async
  self.task_source = block
  self.task_mutex = Mutex.new
  self.stopped = true
  self.threads = Concurrent::Array.new
end

Instance Method Details

#alive?(timeout) ⇒ Boolean

Returns:

  • (Boolean)


69
70
71
72
73
74
75
76
77
78
79
# File 'lib/sbmt/outbox/v2/thread_pool.rb', line 69

def alive?(timeout)
  return false if stopped

  deadline = Time.current - timeout
  threads.all? do |thread|
    last_active_at = last_active_at(thread)
    return false unless last_active_at

    deadline < last_active_at
  end
end

#next_taskObject



24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/sbmt/outbox/v2/thread_pool.rb', line 24

def next_task
  task_mutex.synchronize do
    return if stopped
    task = task_source.call

    if task == BREAK
      self.stopped = true
      return
    end

    task
  end
end

#running?Boolean

Returns:

  • (Boolean)


63
64
65
66
67
# File 'lib/sbmt/outbox/v2/thread_pool.rb', line 63

def running?
  return false if stopped

  true
end

#startObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/sbmt/outbox/v2/thread_pool.rb', line 38

def start
  self.stopped = false

  mode = start_async ? "async" : "sync"
  logger.log_info("#{name}: starting #{concurrency} threads in #{mode} mode")

  result = run_threads do |task|
    logger.with_tags(worker: worker_number) do
      yield worker_number, task
    end
  end

  logger.log_info("#{name}: threads started")

  raise result if result.is_a?(Exception)
end

#stopObject



55
56
57
58
59
60
61
# File 'lib/sbmt/outbox/v2/thread_pool.rb', line 55

def stop
  self.stopped = true

  threads.map(&:join) if start_async
ensure
  stop_threads
end