Class: Knj::Threadpool

Inherits:
Object show all
Defined in:
lib/knj/threadpool.rb

Defined Under Namespace

Classes: Worker

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Threadpool

Returns a new instance of Threadpool.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/knj/threadpool.rb', line 9

def initialize(args = {})
  @args = args
  @args[:sleep] = 0.2 if !@args.key?(:sleep)
  
  raise "Invalid number of threads: '#{@args[:threads]}'." if !@args[:threads] or @args[:threads].to_i <= 0
  
  @workers = []
  @blocks = []
  @mutex = Mutex.new
  @events = Knj::Event_handler.new
  @events.add_event(:name => :on_error)
  
  self.start
end

Instance Attribute Details

#argsObject (readonly)

Returns the value of attribute args.



7
8
9
# File 'lib/knj/threadpool.rb', line 7

def args
  @args
end

#blocksObject (readonly)

Returns the value of attribute blocks.



7
8
9
# File 'lib/knj/threadpool.rb', line 7

def blocks
  @blocks
end

#eventsObject (readonly)

Returns the value of attribute events.



7
8
9
# File 'lib/knj/threadpool.rb', line 7

def events
  @events
end

#mutexObject (readonly)

Returns the value of attribute mutex.



7
8
9
# File 'lib/knj/threadpool.rb', line 7

def mutex
  @mutex
end

#workersObject (readonly)

Returns the value of attribute workers.



7
8
9
# File 'lib/knj/threadpool.rb', line 7

def workers
  @workers
end

Class Method Details

.worker_dataObject



2
3
4
5
# File 'lib/knj/threadpool.rb', line 2

def self.worker_data
  raise "This thread is not running via the threadpool." if !Thread.current[:knj_threadpool]
  return Thread.current[:knj_threadpool]
end

Instance Method Details

#get_blockObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/knj/threadpool.rb', line 83

def get_block
  return false if !@running
  
  @mutex.synchronize do
    @blocks.each do |blockdata|
      if blockdata and !blockdata[:running] and !blockdata[:runned]
        blockdata[:running] = true
        return blockdata
      end
    end
    
    return false
  end
end

#run(*args, &block) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/knj/threadpool.rb', line 49

def run(*args, &block)
  raise "No block given." if !block_given?
  blockdata = {:block => block, :result => nil, :running => false, :runned => false, :args => args}
  @blocks << blockdata
  
  loop do
    sleep @args[:sleep]
    
    if blockdata[:runned]
      begin
        res = blockdata[:result]
        raise blockdata[:error] if blockdata.key?(:error)
      ensure
        @mutex.synchronize do
          blockdata.clear
          @blocks.delete(blockdata)
        end
      end
      
      return res
    end
  end
end

#run_async(*args, &block) ⇒ Object



73
74
75
76
77
78
79
80
81
# File 'lib/knj/threadpool.rb', line 73

def run_async(*args, &block)
  raise "No block given." if !block_given?
  
  @mutex.synchronize do
    blockdata = {:block => block, :running => false, :runned => false, :args => args}
    @blocks << blockdata
    return blockdata
  end
end

#startObject



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/knj/threadpool.rb', line 24

def start
  @mutex.synchronize do
    if !@running
      @workers.length.upto(@args[:threads]) do |count|
        @workers << Knj::Threadpool::Worker.new(:threadpool => self, :id => count)
      end
      
      @running = true
    end
  end
end

#stopObject



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/knj/threadpool.rb', line 36

def stop
  if @running
    @workers.each do |worker|
      if !worker.running
        worker.kill
        @workers.delete(worker)
      end
    end
    
    @running = false
  end
end