Class: Lossfully::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/lossfully/thread_pool.rb

Overview

There’s (at least) two ways to do this: 1) the Ruby Recipes way, which is to make a thread for every incoming task, but put it to sleep until there’s room in the pool for another task, or 2) have a pool of threads that eat tasks from a queue. This implements (2), mainly because it seemed more fun to me. But also because it doesn’t require the explicit use of Mutexes at all; it uses them, for the sake of sending signals with ConditionVaribles, but if those signals aren’t received there would be a delay of at most 1 second.

Another useful thing about this implementation is for the case when every task you anticipate adding to the ThreadPool of the same general form. Then then ThreadPool can be initialized with a block and you can just add objects to the task queue.

Constant Summary collapse

DEFAULT_BLOCK =
lambda {|block, &blk| block = blk if block_given? ; block.call}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_size = 1, block = nil, &blk) ⇒ ThreadPool

Returns a new instance of ThreadPool.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/lossfully/thread_pool.rb', line 44

def initialize(max_size = 1, block=nil, &blk)
  @running = true
  @joining = false
  
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @max_size = max_size
  block = blk if block_given?
  @block = block.nil? ? DEFAULT_BLOCK : block
  @queue = Queue.new
  @workers = []
  @master = master_thread
  @completed = 0
  @total = 0
end

Instance Attribute Details

#completedObject (readonly)

Returns the value of attribute completed.



76
77
78
# File 'lib/lossfully/thread_pool.rb', line 76

def completed
  @completed
end

#max_sizeObject

Returns the value of attribute max_size.



76
77
78
# File 'lib/lossfully/thread_pool.rb', line 76

def max_size
  @max_size
end

#mutexObject (readonly)

Returns the value of attribute mutex.



76
77
78
# File 'lib/lossfully/thread_pool.rb', line 76

def mutex
  @mutex
end

#totalObject (readonly)

Returns the value of attribute total.



76
77
78
# File 'lib/lossfully/thread_pool.rb', line 76

def total
  @total
end

Instance Method Details

#<<(block_or_item) ⇒ Object



85
86
87
# File 'lib/lossfully/thread_pool.rb', line 85

def << block_or_item
  process block_or_item
end

#currentObject



72
73
74
# File 'lib/lossfully/thread_pool.rb', line 72

def current
  @total - @queue.size
end

#joinObject



89
90
91
92
93
94
95
96
97
# File 'lib/lossfully/thread_pool.rb', line 89

def join
  @running = false
  @joining = true
  signal_master
  # A weird bug happens on this next line if you don't test
  # @master.alive?, but only sometimes.  I don't care enough to
  # figure it out right now.
  @master.join if @master.alive?
end

#killObject



112
113
114
115
116
# File 'lib/lossfully/thread_pool.rb', line 112

def kill
  @queue.clear
  @workers.each(&:kill)
  join
end

#process(block_or_item = nil, &blk) ⇒ Object Also known as: enq, dispatch



60
61
62
63
64
65
66
67
68
69
70
# File 'lib/lossfully/thread_pool.rb', line 60

def process (block_or_item=nil, &blk)
  block_or_item = blk if block_given?
  if block_or_item.respond_to?(:call) 
    @queue << block_or_item
  else
    @queue << lambda { @block.call(block_or_item) }
  end
#      @mutex.synchronize { @total +=1 }
  @total += 1
  signal_master
end

#queue_sizeObject



103
104
105
# File 'lib/lossfully/thread_pool.rb', line 103

def queue_size
  @queue.size
end

#sizeObject



99
100
101
# File 'lib/lossfully/thread_pool.rb', line 99

def size
  @workers.size
end

#stopObject



107
108
109
110
# File 'lib/lossfully/thread_pool.rb', line 107

def stop
  @queue.clear
  join
end