Class: Knj::Threadpool

Inherits:
Object show all
Defined in:
lib/knj/threadpool.rb

Overview

A small threadpool framework.

Defined Under Namespace

Classes: Asynced, Worker

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Threadpool

Constructor.

Examples

tp = Knj::Threadpool.new(:threads => 5)



13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/knj/threadpool.rb', line 13

def initialize(args = {})
  @args = args
  @args[:sleep] = 0.2 if !@args.key?(:sleep)
  
  raise "Invalid number of threads: '#{@args[:threads]}'." if !@args[:threads] or @args[:threads].to_i <= 0
  
  @workers = []
  @blocks = []
  @mutex = Mutex.new
  @events = Knj::Event_handler.new
  @events.add_event(:name => :on_error)
  
  self.start
end

Instance Attribute Details

#argsObject (readonly)

Returns the value of attribute args.



8
9
10
# File 'lib/knj/threadpool.rb', line 8

def args
  @args
end

#blocksObject (readonly)

Returns the value of attribute blocks.



8
9
10
# File 'lib/knj/threadpool.rb', line 8

def blocks
  @blocks
end

#eventsObject (readonly)

Returns the value of attribute events.



8
9
10
# File 'lib/knj/threadpool.rb', line 8

def events
  @events
end

#mutexObject (readonly)

Returns the value of attribute mutex.



8
9
10
# File 'lib/knj/threadpool.rb', line 8

def mutex
  @mutex
end

#workersObject (readonly)

Returns the value of attribute workers.



8
9
10
# File 'lib/knj/threadpool.rb', line 8

def workers
  @workers
end

Class Method Details

.worker_dataObject



3
4
5
6
# File 'lib/knj/threadpool.rb', line 3

def self.worker_data
  raise "This thread is not running via the threadpool." if !Thread.current[:knj_threadpool]
  return Thread.current[:knj_threadpool]
end

Instance Method Details

#get_blockObject

Returns a new block to be runned if there is one. Otherwise false.



92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/knj/threadpool.rb', line 92

def get_block
  return false if !@running
  
  @mutex.synchronize do
    @blocks.each do |blockdata|
      if blockdata and !blockdata[:running] and !blockdata[:runned]
        blockdata[:running] = true
        return blockdata
      end
    end
    
    return false
  end
end

#run(*args, &block) ⇒ Object

Runs the given block, waits for the result and returns the result.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/knj/threadpool.rb', line 56

def run(*args, &block)
  raise "No block given." if !block_given?
  blockdata = {:block => block, :result => nil, :running => false, :runned => false, :args => args}
  @blocks << blockdata
  
  loop do
    sleep @args[:sleep]
    
    if blockdata[:runned]
      begin
        res = blockdata[:result]
        raise blockdata[:error] if blockdata.key?(:error)
      ensure
        @mutex.synchronize do
          blockdata.clear
          @blocks.delete(blockdata)
        end
      end
      
      return res
    end
  end
end

#run_async(*args, &block) ⇒ Object

Runs the given block in the threadpool asynced. Returns a ‘Knj::Threadpool::Asynced’-object that can be used to get the result and more.



81
82
83
84
85
86
87
88
89
# File 'lib/knj/threadpool.rb', line 81

def run_async(*args, &block)
  raise "No block given." if !block_given?
  
  @mutex.synchronize do
    blockdata = {:block => block, :running => false, :runned => false, :args => args}
    @blocks << blockdata
    return Knj::Threadpool::Asynced.new(blockdata)
  end
end

#startObject

Starts the threadpool. This is automatically called from the constructor.



29
30
31
32
33
34
35
36
37
38
39
# File 'lib/knj/threadpool.rb', line 29

def start
  @mutex.synchronize do
    if !@running
      @workers.length.upto(@args[:threads]) do |count|
        @workers << Knj::Threadpool::Worker.new(:threadpool => self, :id => count)
      end
      
      @running = true
    end
  end
end

#stopObject

Stops the threadpool.



42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/knj/threadpool.rb', line 42

def stop
  if @running
    @workers.each do |worker|
      if !worker.running
        worker.kill
        @workers.delete(worker)
      end
    end
    
    @running = false
  end
end