Class: GrpcKit::RpcDispatcher
- Inherits:
-
Object
- Object
- GrpcKit::RpcDispatcher
- Defined in:
- lib/grpc_kit/rpc_dispatcher.rb,
lib/grpc_kit/rpc_dispatcher/auto_trimmer.rb
Defined Under Namespace
Classes: AutoTrimmer
Constant Summary collapse
- DEFAULT_MAX =
20
- DEFAULT_MIN =
5
Instance Method Summary collapse
-
#initialize(rpcs, max: DEFAULT_MAX, min: DEFAULT_MIN, interval: 30) ⇒ RpcDispatcher
constructor
A new instance of RpcDispatcher.
- #schedule(task) ⇒ Object
- #shutdown ⇒ Object
- #trim(force = false) ⇒ Object
Constructor Details
#initialize(rpcs, max: DEFAULT_MAX, min: DEFAULT_MIN, interval: 30) ⇒ RpcDispatcher
Returns a new instance of RpcDispatcher.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/grpc_kit/rpc_dispatcher.rb', line 16 def initialize(rpcs, max: DEFAULT_MAX, min: DEFAULT_MIN, interval: 30) @rpcs = rpcs @max_pool_size = max @min_pool_size = min unless max == min @auto_trimmer = AutoTrimmer.new(self, interval: interval).tap(&:start!) end @shutdown = false @tasks = Queue.new @spawned = 0 @workers = [] @mutex = Mutex.new @min_pool_size.times { spawn_thread } end |
Instance Method Details
#schedule(task) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/grpc_kit/rpc_dispatcher.rb', line 34 def schedule(task) if task.nil? return end if @shutdown raise "scheduling new task isn't allowed during shutdown" end @tasks.push(task) if @tasks.size > 1 && @mutex.synchronize { @spawned < @max_pool_size } spawn_thread end end |
#shutdown ⇒ Object
49 50 51 52 53 |
# File 'lib/grpc_kit/rpc_dispatcher.rb', line 49 def shutdown @shutdown = true @auto_trimmer.stop if @auto_trimmer @max_pool_size.times { @tasks.push(nil) } end |
#trim(force = false) ⇒ Object
55 56 57 58 59 60 |
# File 'lib/grpc_kit/rpc_dispatcher.rb', line 55 def trim(force = false) if (force || @tasks.empty?) && @mutex.synchronize { @spawned > @min_pool_size } GrpcKit.logger.debug("Decrease RpcDipatcher's worker. Next worker size is #{@spawned - 1}") @tasks.push(nil) end end |