Class: Concur::ThreadPool

Inherits:
Executor::Base show all
Defined in:
lib/thread_pool.rb

Overview

Defined Under Namespace

Classes: UberThread

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_size) ⇒ ThreadPool

Returns a new instance of ThreadPool.



15
16
17
18
19
20
21
22
23
24
# File 'lib/thread_pool.rb', line 15

def initialize(max_size)
  @max_size = max_size
#      @thread_queue = SizedQueue.new(max_size)
  @running = true
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @queue = Queue.new
  @threads = []

end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



13
14
15
# File 'lib/thread_pool.rb', line 13

def queue
  @queue
end

Instance Method Details

#execute(runnable = nil, channel = nil, &blk) ⇒ Object



44
45
46
47
48
# File 'lib/thread_pool.rb', line 44

def execute(runnable=nil, channel=nil, &blk)
  f = StandardFuture.new(runnable, channel, &blk)
  process(f)
  f
end

#process(callable, &blk) ⇒ Object



38
39
40
41
42
# File 'lib/thread_pool.rb', line 38

def process(callable, &blk)
  callable = blk if block_given?
  @queue.push(callable)
  start_thread
end

#queue_sizeObject



69
70
71
# File 'lib/thread_pool.rb', line 69

def queue_size
  @queue.size
end

#shutdownObject



26
27
28
# File 'lib/thread_pool.rb', line 26

def shutdown
  @running = false
end

#start_threadObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/thread_pool.rb', line 51

def start_thread
  @mutex.synchronize do
    if !@queue.empty? && @threads.size <= @max_size
      t = UberThread.new do
        while @running
          f = @queue.pop
          f.thread = t
          f.call
        end
#            Concur.logger.info "Thread dying " + t.inspect
      end
      Concur.logger.debug "Created new thread " + t.inspect
      @threads << t
    end
  end
end

#update(changes) ⇒ Object

listen for config changes



31
32
33
34
35
36
# File 'lib/thread_pool.rb', line 31

def update(changes)
  if changes[:max_threads]
    @max_size = changes[:max_threads]
    puts "Changed max size to #{changes[:max_threads]}"
  end
end