Class: GrpcKit::RpcDispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/grpc_kit/rpc_dispatcher.rb,
lib/grpc_kit/rpc_dispatcher/auto_trimmer.rb

Defined Under Namespace

Classes: AutoTrimmer

Constant Summary collapse

DEFAULT_MAX =
20
DEFAULT_MIN =
5

Instance Method Summary collapse

Constructor Details

#initialize(rpcs, max: DEFAULT_MAX, min: DEFAULT_MIN, interval: 30) ⇒ RpcDispatcher

Returns a new instance of RpcDispatcher.

Parameters:

  • rpcs (Hash<String,GrpcKit::RpcDesc>)
  • min (Integer) (defaults to: DEFAULT_MIN)

    A mininum thread pool size

  • max (Integer) (defaults to: DEFAULT_MAX)

    A maximum thread pool size

  • interval (Integer) (defaults to: 30)

    An interval time of calling #trim



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/grpc_kit/rpc_dispatcher.rb', line 16

def initialize(rpcs, max: DEFAULT_MAX, min: DEFAULT_MIN, interval: 30)
  @rpcs = rpcs
  @max_pool_size = max
  @min_pool_size = min
  unless max == min
    @auto_trimmer = AutoTrimmer.new(self, interval: interval).tap(&:start!)
  end

  @shutdown = false
  @tasks = Queue.new
  @spawned = 0
  @workers = []
  @mutex = Mutex.new

  @min_pool_size.times { spawn_thread }
end

Instance Method Details

#schedule(task) ⇒ Object

Parameters:

  • task (Object)

    task to dispatch



34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/grpc_kit/rpc_dispatcher.rb', line 34

def schedule(task)
  if task.nil?
    return
  end

  if @shutdown
    raise "scheduling new task isn't allowed during shutdown"
  end

  @tasks.push(task)
  if @tasks.size > 1 && @mutex.synchronize { @spawned < @max_pool_size }
    spawn_thread
  end
end

#shutdownObject



49
50
51
52
53
# File 'lib/grpc_kit/rpc_dispatcher.rb', line 49

def shutdown
  @shutdown = true
  @auto_trimmer.stop if @auto_trimmer
  @max_pool_size.times { @tasks.push(nil) }
end

#trim(force = false) ⇒ Object



55
56
57
58
59
60
# File 'lib/grpc_kit/rpc_dispatcher.rb', line 55

def trim(force = false)
  if (force || @tasks.empty?) && @mutex.synchronize { @spawned > @min_pool_size }
    GrpcKit.logger.debug("Decrease RpcDipatcher's worker. Next worker size is #{@spawned - 1}")
    @tasks.push(nil)
  end
end