Class: ASIR::ThreadPool
- Inherits:
-
Object
- Object
- ASIR::ThreadPool
- Includes:
- AdditionalData, Initialization
- Defined in:
- lib/asir/thread_pool.rb
Defined Under Namespace
Instance Attribute Summary collapse
-
#auto_start_workers ⇒ Object
Returns the value of attribute auto_start_workers.
-
#n_workers ⇒ Object
Returns the value of attribute n_workers.
-
#run ⇒ Object
Returns the value of attribute run.
-
#thread_class ⇒ Object
Returns the value of attribute thread_class.
-
#verbose ⇒ Object
Returns the value of attribute verbose.
-
#work_queue ⇒ Object
Returns the value of attribute work_queue.
-
#workers ⇒ Object
Returns the value of attribute workers.
Instance Method Summary collapse
-
#initialize(*args) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #join(*args) ⇒ Object
- #kill!(*args) ⇒ Object
- #log!(msg = nil) ⇒ Object
-
#new(&blk) ⇒ Object
Returns a new Work object.
-
#start_workers! ⇒ Object
Keep a list of workers busy.
- #stop! ⇒ Object
- #work_created!(work) ⇒ Object
- #work_starting!(work) ⇒ Object
- #work_stopping!(work) ⇒ Object
- #worker_created!(worker) ⇒ Object
- #worker_starting!(worker) ⇒ Object
- #worker_stopping!(worker) ⇒ Object
Methods included from AdditionalData
#[], #[]=, #_additional_data, #additional_data, #additional_data!, #additional_data=, included
Constructor Details
#initialize(*args) ⇒ ThreadPool
Returns a new instance of ThreadPool.
15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/asir/thread_pool.rb', line 15 def initialize *args super @thread_class ||= ::Thread @workers_mutex = Mutex.new @work_mutex = Mutex.new @workers ||= [ ] @work_queue ||= Queue.new @run = false @work_id = @worker_id = 0 @time_0 ||= Time.now end |
Instance Attribute Details
#auto_start_workers ⇒ Object
Returns the value of attribute auto_start_workers.
10 11 12 |
# File 'lib/asir/thread_pool.rb', line 10 def auto_start_workers @auto_start_workers end |
#n_workers ⇒ Object
Returns the value of attribute n_workers.
9 10 11 |
# File 'lib/asir/thread_pool.rb', line 9 def n_workers @n_workers end |
#run ⇒ Object
Returns the value of attribute run.
13 14 15 |
# File 'lib/asir/thread_pool.rb', line 13 def run @run end |
#thread_class ⇒ Object
Returns the value of attribute thread_class.
9 10 11 |
# File 'lib/asir/thread_pool.rb', line 9 def thread_class @thread_class end |
#verbose ⇒ Object
Returns the value of attribute verbose.
12 13 14 |
# File 'lib/asir/thread_pool.rb', line 12 def verbose @verbose end |
#work_queue ⇒ Object
Returns the value of attribute work_queue.
11 12 13 |
# File 'lib/asir/thread_pool.rb', line 11 def work_queue @work_queue end |
#workers ⇒ Object
Returns the value of attribute workers.
9 10 11 |
# File 'lib/asir/thread_pool.rb', line 9 def workers @workers end |
Instance Method Details
#join(*args) ⇒ Object
131 132 133 134 135 136 137 |
# File 'lib/asir/thread_pool.rb', line 131 def join *args until @workers.empty? @workers.each do | worker | worker && worker.join(*args) end end end |
#kill!(*args) ⇒ Object
120 121 122 123 124 125 126 127 128 129 |
# File 'lib/asir/thread_pool.rb', line 120 def kill! *args log! "kill!" @run = false @workers_mutex.synchronize do @workers.each do | worker | worker.kill! *args end end self end |
#log!(msg = nil) ⇒ Object
98 99 100 101 102 103 104 |
# File 'lib/asir/thread_pool.rb', line 98 def log! msg = nil return unless @verbose msg ||= yield @time_1 = Time.now $stderr.puts " #{@time_1 - @time_0} #{$$} #{Thread.current.object_id} #{self} #{msg}" self end |
#new(&blk) ⇒ Object
Returns a new Work object.
28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/asir/thread_pool.rb', line 28 def new &blk work_id = @work_mutex.synchronize do @work_id += 1 end work = Work.new(:block => blk, :work_id => work_id) work_created! work @work_queue.enq(work) @run = true start_workers! if @auto_start_workers work end |
#start_workers! ⇒ Object
Keep a list of workers busy.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/asir/thread_pool.rb', line 41 def start_workers! return nil unless @run workers_size = @workers.size want_n = [ n_workers, @work_queue.size ].min want_n = n_workers if want_n > n_workers start_n = want_n - workers_size start_n = 0 if start_n < 0 return unless start_n > 0 log! { "start_workers! #{start_n}" } start_n.times do thread_class.new do worker_id = @workers_mutex.synchronize do @worker_id += 1 end worker = Worker.new(:thread_pool => self, :worker_id => worker_id) worker_created! worker begin worker_starting! worker @workers_mutex.synchronize do @workers << worker end worker.run! ensure @workers_mutex.synchronize do @workers.delete(worker) end worker_stopping! worker end end end self end |
#stop! ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/asir/thread_pool.rb', line 106 def stop! log! "stop!" @run = false # Ask each current worker to :stop! @workers_mutex.synchronize do @workers.each do | w | @work_queue.enq :stop! end end # Just incase. @work_queue.enq :stop! self end |
#work_created!(work) ⇒ Object
86 87 88 |
# File 'lib/asir/thread_pool.rb', line 86 def work_created! work log! { "work_created! #{work.inspect}" } end |
#work_starting!(work) ⇒ Object
90 91 92 |
# File 'lib/asir/thread_pool.rb', line 90 def work_starting! work log! { "work_starting! #{work.inspect} #{work.worker.inspect}" } end |
#work_stopping!(work) ⇒ Object
94 95 96 |
# File 'lib/asir/thread_pool.rb', line 94 def work_stopping! work log! { "work_stopping! #{work.inspect}" } end |
#worker_created!(worker) ⇒ Object
74 75 76 |
# File 'lib/asir/thread_pool.rb', line 74 def worker_created! worker log! { "worker_created! #{worker.inspect}" } end |
#worker_starting!(worker) ⇒ Object
78 79 80 |
# File 'lib/asir/thread_pool.rb', line 78 def worker_starting! worker log! { "worker_starting! #{worker}" } end |
#worker_stopping!(worker) ⇒ Object
82 83 84 |
# File 'lib/asir/thread_pool.rb', line 82 def worker_stopping! worker log! { "worker_stopping! #{worker}" } end |