Class: GRPC::Pool
- Inherits:
-
Object
- Object
- GRPC::Pool
- Defined in:
- lib/grpc/generic/rpc_server.rb
Overview
Pool is a simple thread pool.
Constant Summary collapse
- DEFAULT_KEEP_ALIVE =
Default keep alive period is 1s
1
Instance Method Summary collapse
-
#initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) ⇒ Pool
constructor
A new instance of Pool.
-
#jobs_waiting ⇒ Object
Returns the number of jobs waiting.
-
#schedule(*args, &blk) ⇒ Object
Runs the given block on the queue with the provided args.
-
#start ⇒ Object
Starts running the jobs in the thread pool.
-
#stop ⇒ Object
Stops the jobs in the pool.
Constructor Details
#initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) ⇒ Pool
Returns a new instance of Pool.
74 75 76 77 78 79 80 81 82 83 |
# File 'lib/grpc/generic/rpc_server.rb', line 74 def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) fail 'pool size must be positive' unless size > 0 @jobs = Queue.new @size = size @stopped = false @stop_mutex = Mutex.new # needs to be held when accessing @stopped @stop_cond = ConditionVariable.new @workers = [] @keep_alive = keep_alive end |
Instance Method Details
#jobs_waiting ⇒ Object
Returns the number of jobs waiting
86 87 88 |
# File 'lib/grpc/generic/rpc_server.rb', line 86 def jobs_waiting @jobs.size end |
#schedule(*args, &blk) ⇒ Object
Runs the given block on the queue with the provided args.
94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/grpc/generic/rpc_server.rb', line 94 def schedule(*args, &blk) return if blk.nil? @stop_mutex.synchronize do if @stopped GRPC.logger.warn('did not schedule job, already stopped') return end GRPC.logger.info('schedule another job') @jobs << [blk, args] end end |
#start ⇒ Object
Starts running the jobs in the thread pool.
107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/grpc/generic/rpc_server.rb', line 107 def start fail 'already stopped' if @stopped until @workers.size == @size.to_i next_thread = Thread.new do catch(:exit) do # allows { throw :exit } to kill a thread loop_execute_jobs end remove_current_thread end @workers << next_thread end end |
#stop ⇒ Object
Stops the jobs in the pool
121 122 123 124 125 126 127 128 129 130 |
# File 'lib/grpc/generic/rpc_server.rb', line 121 def stop GRPC.logger.info('stopping, will wait for all the workers to exit') @workers.size.times { schedule { throw :exit } } @stop_mutex.synchronize do # wait @keep_alive for works to stop @stopped = true @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 end forcibly_stop_workers GRPC.logger.info('stopped, all workers are shutdown') end |