Class: ASIR::ThreadPool

Inherits:
Object
  • Object
show all
Includes:
AdditionalData, Initialization
Defined in:
lib/asir/thread_pool.rb

Defined Under Namespace

Classes: Work, Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AdditionalData

#[], #[]=, #_additional_data, #additional_data, #additional_data!, #additional_data=, included

Constructor Details

#initialize(*args) ⇒ ThreadPool

Returns a new instance of ThreadPool.



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/asir/thread_pool.rb', line 15

def initialize *args
  super
  @thread_class ||= ::Thread
  @workers_mutex  = Mutex.new
  @work_mutex     = Mutex.new
  @workers      ||= [ ]
  @work_queue   ||= Queue.new
  @run = false
  @work_id = @worker_id = 0
  @time_0 ||= Time.now
end

Instance Attribute Details

#auto_start_workersObject

Returns the value of attribute auto_start_workers.



10
11
12
# File 'lib/asir/thread_pool.rb', line 10

def auto_start_workers
  @auto_start_workers
end

#n_workersObject

Returns the value of attribute n_workers.



9
10
11
# File 'lib/asir/thread_pool.rb', line 9

def n_workers
  @n_workers
end

#runObject

Returns the value of attribute run.



13
14
15
# File 'lib/asir/thread_pool.rb', line 13

def run
  @run
end

#thread_classObject

Returns the value of attribute thread_class.



9
10
11
# File 'lib/asir/thread_pool.rb', line 9

def thread_class
  @thread_class
end

#verboseObject

Returns the value of attribute verbose.



12
13
14
# File 'lib/asir/thread_pool.rb', line 12

def verbose
  @verbose
end

#work_queueObject

Returns the value of attribute work_queue.



11
12
13
# File 'lib/asir/thread_pool.rb', line 11

def work_queue
  @work_queue
end

#workersObject

Returns the value of attribute workers.



9
10
11
# File 'lib/asir/thread_pool.rb', line 9

def workers
  @workers
end

Instance Method Details

#join(*args) ⇒ Object



131
132
133
134
135
136
137
# File 'lib/asir/thread_pool.rb', line 131

def join *args
  until @workers.empty?
    @workers.each do | worker |
      worker && worker.join(*args)
    end
  end
end

#kill!(*args) ⇒ Object



120
121
122
123
124
125
126
127
128
129
# File 'lib/asir/thread_pool.rb', line 120

def kill! *args
  log! "kill!"
  @run = false
  @workers_mutex.synchronize do
    @workers.each do | worker |
      worker.kill! *args
    end
  end
  self
end

#log!(msg = nil) ⇒ Object



98
99
100
101
102
103
104
# File 'lib/asir/thread_pool.rb', line 98

def log! msg = nil
  return unless @verbose
  msg ||= yield
  @time_1 = Time.now
  $stderr.puts "  #{@time_1 - @time_0} #{$$} #{Thread.current.object_id} #{self} #{msg}"
  self
end

#new(&blk) ⇒ Object

Returns a new Work object.



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/asir/thread_pool.rb', line 28

def new &blk
  work_id = @work_mutex.synchronize do
    @work_id += 1
  end
  work = Work.new(:block => blk, :work_id => work_id)
  work_created! work
  @work_queue.enq(work)
  @run = true
  start_workers! if @auto_start_workers
  work
end

#start_workers!Object

Keep a list of workers busy.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/asir/thread_pool.rb', line 41

def start_workers!
  return nil unless @run
  workers_size = @workers.size
  want_n = [ n_workers, @work_queue.size ].min
  want_n = n_workers if want_n > n_workers
  start_n = want_n - workers_size
  start_n = 0 if start_n < 0
  return unless start_n > 0
  log! { "start_workers! #{start_n}" }
  start_n.times do
    thread_class.new do
      worker_id = @workers_mutex.synchronize do
        @worker_id += 1
      end
      worker = Worker.new(:thread_pool => self, :worker_id => worker_id)
      worker_created! worker
      begin
        worker_starting! worker
        @workers_mutex.synchronize do
          @workers << worker
        end
        worker.run!
      ensure
        @workers_mutex.synchronize do
          @workers.delete(worker)
        end
        worker_stopping! worker
      end
    end
  end
  self
end

#stop!Object



106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/asir/thread_pool.rb', line 106

def stop!
  log! "stop!"
  @run = false
  # Ask each current worker to :stop!
  @workers_mutex.synchronize do
    @workers.each do | w |
      @work_queue.enq :stop!
    end
  end
  # Just incase.
  @work_queue.enq :stop!
  self
end

#work_created!(work) ⇒ Object



86
87
88
# File 'lib/asir/thread_pool.rb', line 86

def work_created! work
  log! { "work_created! #{work.inspect}" }
end

#work_starting!(work) ⇒ Object



90
91
92
# File 'lib/asir/thread_pool.rb', line 90

def work_starting! work
  log! { "work_starting! #{work.inspect} #{work.worker.inspect}" }
end

#work_stopping!(work) ⇒ Object



94
95
96
# File 'lib/asir/thread_pool.rb', line 94

def work_stopping! work
  log! { "work_stopping! #{work.inspect}" }
end

#worker_created!(worker) ⇒ Object



74
75
76
# File 'lib/asir/thread_pool.rb', line 74

def worker_created! worker
  log! { "worker_created! #{worker.inspect}" }
end

#worker_starting!(worker) ⇒ Object



78
79
80
# File 'lib/asir/thread_pool.rb', line 78

def worker_starting! worker
  log! { "worker_starting! #{worker}" }
end

#worker_stopping!(worker) ⇒ Object



82
83
84
# File 'lib/asir/thread_pool.rb', line 82

def worker_stopping! worker
  log! { "worker_stopping! #{worker}" }
end