Class: GRPC::Pool
- Inherits:
-
Object
- Object
- GRPC::Pool
- Defined in:
- src/ruby/lib/grpc/generic/rpc_server.rb
Overview
Pool is a simple thread pool.
Constant Summary collapse
- DEFAULT_KEEP_ALIVE =
Default keep alive period is 1s
1
Instance Method Summary collapse
-
#initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) ⇒ Pool
constructor
A new instance of Pool.
-
#jobs_waiting ⇒ Object
Returns the number of jobs waiting.
- #ready_for_work? ⇒ Boolean
-
#schedule(*args, &blk) ⇒ Object
Runs the given block on the queue with the provided args.
-
#start ⇒ Object
Starts running the jobs in the thread pool.
-
#stop ⇒ Object
Stops the jobs in the pool.
Constructor Details
#initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) ⇒ Pool
Returns a new instance of Pool.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 27 def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) fail 'pool size must be positive' unless size > 0 @jobs = Queue.new @size = size @stopped = false @stop_mutex = Mutex.new # needs to be held when accessing @stopped @stop_cond = ConditionVariable.new @workers = [] @keep_alive = keep_alive # Each worker thread has its own queue to push and pull jobs # these queues are put into @ready_queues when that worker is idle @ready_workers = Queue.new end |
Instance Method Details
#jobs_waiting ⇒ Object
Returns the number of jobs waiting
43 44 45 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 43 def jobs_waiting @jobs.size end |
#ready_for_work? ⇒ Boolean
47 48 49 50 51 52 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 47 def ready_for_work? # Busy worker threads are either doing work, or have a single job # waiting on them. Workers that are idle with no jobs waiting # have their "queues" in @ready_workers !@ready_workers.empty? end |
#schedule(*args, &blk) ⇒ Object
Runs the given block on the queue with the provided args.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 58 def schedule(*args, &blk) return if blk.nil? @stop_mutex.synchronize do if @stopped GRPC.logger.warn('did not schedule job, already stopped') return end GRPC.logger.info('schedule another job') fail 'No worker threads available' if @ready_workers.empty? worker_queue = @ready_workers.pop fail 'worker already has a task waiting' unless worker_queue.empty? worker_queue << [blk, args] end end |
#start ⇒ Object
Starts running the jobs in the thread pool.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 75 def start @stop_mutex.synchronize do fail 'already stopped' if @stopped end until @workers.size == @size.to_i new_worker_queue = Queue.new @ready_workers << new_worker_queue next_thread = Thread.new(new_worker_queue) do |jobs| catch(:exit) do # allows { throw :exit } to kill a thread loop_execute_jobs(jobs) end remove_current_thread end @workers << next_thread end end |
#stop ⇒ Object
Stops the jobs in the pool
93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 93 def stop GRPC.logger.info('stopping, will wait for all the workers to exit') @stop_mutex.synchronize do # wait @keep_alive seconds for workers to stop @stopped = true loop do break unless ready_for_work? worker_queue = @ready_workers.pop worker_queue << [proc { throw :exit }, []] end @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 end forcibly_stop_workers GRPC.logger.info('stopped, all workers are shutdown') end |