Class: Griffin::ThreadPool
- Inherits:
-
Object
- Object
- Griffin::ThreadPool
- Defined in:
- lib/griffin/thread_pool.rb
Constant Summary collapse
- DEFAULT_MAX =
5
- DEFAULT_MIN =
1
- QUEUE_SIZE =
128
Instance Method Summary collapse
-
#initialize(interval: 60, max: DEFAULT_MAX, min: DEFAULT_MIN, &block) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #resouce_available? ⇒ Boolean
- #schedule(task, &block) ⇒ Object
- #shutdown ⇒ Object
-
#trim(force = false) ⇒ Object
For GrpcKit::ThreadPool::AutoTrimmer.
Constructor Details
#initialize(interval: 60, max: DEFAULT_MAX, min: DEFAULT_MIN, &block) ⇒ ThreadPool
Returns a new instance of ThreadPool.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/griffin/thread_pool.rb', line 11 def initialize(interval: 60, max: DEFAULT_MAX, min: DEFAULT_MIN, &block) @max_pool_size = max @min_pool_size = min @block = block @shutdown = false @tasks = SizedQueue.new(QUEUE_SIZE) @spawned = 0 @workers = [] @mutex = Mutex.new @waiting = 0 @min_pool_size.times { spawn_thread } @auto_trimmer = GrpcKit::RpcDispatcher::AutoTrimmer.new(self, interval: interval + rand(10)).tap(&:start!) end |
Instance Method Details
#resouce_available? ⇒ Boolean
44 45 46 |
# File 'lib/griffin/thread_pool.rb', line 44 def resouce_available? (@waiting != 0) || (@spawned != @max_pool_size) end |
#schedule(task, &block) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/griffin/thread_pool.rb', line 27 def schedule(task, &block) if task.nil? return end if @shutdown raise "scheduling new task isn't allowed during shutdown" end # TODO: blocking now.. @tasks.push(block || task) if @mutex.synchronize { (@waiting < @tasks.size) && (@spawned < @max_pool_size) } spawn_thread end end |
#shutdown ⇒ Object
48 49 50 51 52 53 54 55 56 |
# File 'lib/griffin/thread_pool.rb', line 48 def shutdown @shutdown = true @max_pool_size.times { @tasks.push(nil) } @auto_trimmer.stop until @workers.empty? Griffin.logger.debug("Shutdown waiting #{@waiting} workers") sleep 1 end end |
#trim(force = false) ⇒ Object
For GrpcKit::ThreadPool::AutoTrimmer
59 60 61 62 63 64 |
# File 'lib/griffin/thread_pool.rb', line 59 def trim(force = false) if @mutex.synchronize { (force || (@waiting > 0)) && (@spawned > @min_pool_size) } GrpcKit.logger.info("Trim worker! Next worker size #{@spawned - 1}") @tasks.push(nil) end end |