Class: GRPC::Pool

Inherits:
Object
  • Object
show all
Defined in:
src/ruby/lib/grpc/generic/rpc_server.rb

Overview

Pool is a simple thread pool.

Constant Summary collapse

DEFAULT_KEEP_ALIVE =

Default keep alive period is 1s

1

Instance Method Summary collapse

Constructor Details

#initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) ⇒ Pool

Returns a new instance of Pool.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 27

def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
  fail 'pool size must be positive' unless size > 0
  @jobs = Queue.new
  @size = size
  @stopped = false
  @stop_mutex = Mutex.new # needs to be held when accessing @stopped
  @stop_cond = ConditionVariable.new
  @workers = []
  @keep_alive = keep_alive

  # Each worker thread has its own queue to push and pull jobs
  # these queues are put into @ready_queues when that worker is idle
  @ready_workers = Queue.new
end

Instance Method Details

#jobs_waitingObject

Returns the number of jobs waiting



43
44
45
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 43

def jobs_waiting
  @jobs.size
end

#ready_for_work?Boolean

Returns:

  • (Boolean)


47
48
49
50
51
52
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 47

def ready_for_work?
  # Busy worker threads are either doing work, or have a single job
  # waiting on them. Workers that are idle with no jobs waiting
  # have their "queues" in @ready_workers
  !@ready_workers.empty?
end

#schedule(*args, &blk) ⇒ Object

Runs the given block on the queue with the provided args.

Parameters:

  • args

    the args passed blk when it is called

  • blk

    the block to call



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 58

def schedule(*args, &blk)
  return if blk.nil?
  @stop_mutex.synchronize do
    if @stopped
      GRPC.logger.warn('did not schedule job, already stopped')
      return
    end
    GRPC.logger.info('schedule another job')
    fail 'No worker threads available' if @ready_workers.empty?
    worker_queue = @ready_workers.pop

    fail 'worker already has a task waiting' unless worker_queue.empty?
    worker_queue << [blk, args]
  end
end

#startObject

Starts running the jobs in the thread pool.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 75

def start
  @stop_mutex.synchronize do
    fail 'already stopped' if @stopped
  end
  until @workers.size == @size.to_i
    new_worker_queue = Queue.new
    @ready_workers << new_worker_queue
    next_thread = Thread.new(new_worker_queue) do |jobs|
      catch(:exit) do  # allows { throw :exit } to kill a thread
        loop_execute_jobs(jobs)
      end
      remove_current_thread
    end
    @workers << next_thread
  end
end

#stopObject

Stops the jobs in the pool



93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 93

def stop
  GRPC.logger.info('stopping, will wait for all the workers to exit')
  @stop_mutex.synchronize do  # wait @keep_alive seconds for workers to stop
    @stopped = true
    loop do
      break unless ready_for_work?
      worker_queue = @ready_workers.pop
      worker_queue << [proc { throw :exit }, []]
    end
    @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
  end
  forcibly_stop_workers
  GRPC.logger.info('stopped, all workers are shutdown')
end