Class: Sbmt::Outbox::V2::ThreadPool
- Inherits:
-
Object
- Object
- Sbmt::Outbox::V2::ThreadPool
- Defined in:
- lib/sbmt/outbox/v2/thread_pool.rb
Constant Summary collapse
- BREAK =
Object.new.freeze
- SKIPPED =
Object.new.freeze
- PROCESSED =
Object.new.freeze
Instance Method Summary collapse
- #alive?(timeout) ⇒ Boolean
-
#initialize(concurrency:, name: "thread_pool", random_startup_delay: true, start_async: true, &block) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #next_task ⇒ Object
- #running? ⇒ Boolean
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(concurrency:, name: "thread_pool", random_startup_delay: true, start_async: true, &block) ⇒ ThreadPool
Returns a new instance of ThreadPool.
13 14 15 16 17 18 19 20 21 22 |
# File 'lib/sbmt/outbox/v2/thread_pool.rb', line 13 def initialize(concurrency:, name: "thread_pool", random_startup_delay: true, start_async: true, &block) self.concurrency = concurrency self.name = name self.random_startup_delay = random_startup_delay self.start_async = start_async self.task_source = block self.task_mutex = Mutex.new self.stopped = true self.threads = Concurrent::Array.new end |
Instance Method Details
#alive?(timeout) ⇒ Boolean
69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/sbmt/outbox/v2/thread_pool.rb', line 69 def alive?(timeout) return false if stopped deadline = Time.current - timeout threads.all? do |thread| last_active_at = last_active_at(thread) return false unless last_active_at deadline < last_active_at end end |
#next_task ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/sbmt/outbox/v2/thread_pool.rb', line 24 def next_task task_mutex.synchronize do return if stopped task = task_source.call if task == BREAK self.stopped = true return end task end end |
#running? ⇒ Boolean
63 64 65 66 67 |
# File 'lib/sbmt/outbox/v2/thread_pool.rb', line 63 def running? return false if stopped true end |
#start ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/sbmt/outbox/v2/thread_pool.rb', line 38 def start self.stopped = false mode = start_async ? "async" : "sync" logger.log_info("#{name}: starting #{concurrency} threads in #{mode} mode") result = run_threads do |task| logger.(worker: worker_number) do yield worker_number, task end end logger.log_info("#{name}: threads started") raise result if result.is_a?(Exception) end |
#stop ⇒ Object
55 56 57 58 59 60 61 |
# File 'lib/sbmt/outbox/v2/thread_pool.rb', line 55 def stop self.stopped = true threads.map(&:join) if start_async ensure stop_threads end |