Class: Griffin::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/griffin/thread_pool.rb

Constant Summary collapse

DEFAULT_MAX =
5
DEFAULT_MIN =
1
QUEUE_SIZE =
128

Instance Method Summary collapse

Constructor Details

#initialize(interval: 60, max: DEFAULT_MAX, min: DEFAULT_MIN, &block) ⇒ ThreadPool

Returns a new instance of ThreadPool.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/griffin/thread_pool.rb', line 11

def initialize(interval: 60, max: DEFAULT_MAX, min: DEFAULT_MIN, &block)
  @max_pool_size = max
  @min_pool_size = min
  @block = block
  @shutdown = false
  @tasks = SizedQueue.new(QUEUE_SIZE)

  @spawned = 0
  @workers = []
  @mutex = Mutex.new
  @waiting = 0

  @min_pool_size.times { spawn_thread }
  @auto_trimmer = GrpcKit::RpcDispatcher::AutoTrimmer.new(self, interval: interval + rand(10)).tap(&:start!)
end

Instance Method Details

#resouce_available?Boolean

Returns:

  • (Boolean)


44
45
46
# File 'lib/griffin/thread_pool.rb', line 44

def resouce_available?
  (@waiting != 0) || (@spawned != @max_pool_size)
end

#schedule(task, &block) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/griffin/thread_pool.rb', line 27

def schedule(task, &block)
  if task.nil?
    return
  end

  if @shutdown
    raise "scheduling new task isn't allowed during shutdown"
  end

  # TODO: blocking now..
  @tasks.push(block || task)

  if @mutex.synchronize { (@waiting < @tasks.size) && (@spawned < @max_pool_size) }
    spawn_thread
  end
end

#shutdownObject



48
49
50
51
52
53
54
55
56
# File 'lib/griffin/thread_pool.rb', line 48

def shutdown
  @shutdown = true
  @max_pool_size.times { @tasks.push(nil) }
  @auto_trimmer.stop
  until @workers.empty?
    Griffin.logger.debug("Shutdown waiting #{@waiting} workers")
    sleep 1
  end
end

#trim(force = false) ⇒ Object

For GrpcKit::ThreadPool::AutoTrimmer



59
60
61
62
63
64
# File 'lib/griffin/thread_pool.rb', line 59

def trim(force = false)
  if @mutex.synchronize { (force || (@waiting > 0)) && (@spawned > @min_pool_size) }
    GrpcKit.logger.info("Trim worker! Next worker size #{@spawned - 1}")
    @tasks.push(nil)
  end
end