Class: Isono::ThreadPool
- Inherits:
-
Object
- Object
- Isono::ThreadPool
- Includes:
- Logger
- Defined in:
- lib/isono/thread_pool.rb
Defined Under Namespace
Classes: TimeoutError, WorkerTerminateError
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
-
#barrier(immediate = true, time_out = nil, &blk) ⇒ Object
Send a block to a worker thread similar with pass().
- #clear ⇒ Object
- #graceful_shutdown2 ⇒ Object
-
#initialize(worker_num = 1, name = nil, opts = {}) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #member_thread?(thread = Thread.current) ⇒ Boolean
-
#pass(immediate = true, &blk) ⇒ Object
Pass a block to a worker thread.
-
#shutdown ⇒ Object
Immediatly shutdown all the worker threads.
- #shutdown_graceful(timeout) ⇒ Object
Methods included from Logger
Constructor Details
#initialize(worker_num = 1, name = nil, opts = {}) ⇒ ThreadPool
Returns a new instance of ThreadPool.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/isono/thread_pool.rb', line 13 def initialize(worker_num=1, name=nil, opts={}) @queue = ::Queue.new @name = name @opts = {:stucked_queue_num=>20}.merge(opts) @last_stuck_warn_at = Time.now @worker_threads = {} worker_num.times { |wid| t = Thread.new { # Log4r::PatternFormatter can refer thread name as %h. Thread.current[:name] = "#{name}[#{wid}/#{worker_num}]" if name begin while op = @queue.pop if @queue.size > @opts[:stucked_queue_num] && Time.now - @last_stuck_warn_at > 30.0 logger.warn("too many stucked jobs: #{@queue.size}") @last_stuck_warn_at = Time.now end op_start_at = Time.now op.call op_elapsed = Time.now - op_start_at on_task_end(op_start_at, op_elapsed) end rescue WorkerTerminateError # someone indicated to terminate this thread # exit from the current loop break rescue ::Exception => e logger.error(e) # worker thread should never die except from the # termination using shutdown() method. # any errors thrown by op.call will be caught here and # back to @queue.pop. retry ensure tid = Thread.current.__id__ tname = Thread.current[:name] || Thread.current.to_s EM.schedule { @worker_threads.delete(tid) logger.info("Thread #{tname} is being terminated") } end } @worker_threads[t.__id__] = t } end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
11 12 13 |
# File 'lib/isono/thread_pool.rb', line 11 def name @name end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
11 12 13 |
# File 'lib/isono/thread_pool.rb', line 11 def queue @queue end |
Instance Method Details
#barrier(immediate = true, time_out = nil, &blk) ⇒ Object
Send a block to a worker thread similar with pass(). but this get the caller thread waited until the block proceeded in a worker thread.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/isono/thread_pool.rb', line 79 def (immediate=true, time_out=nil, &blk) if immediate && member_thread? return blk.call end q = ::Queue.new time_start = ::Time.now self.pass { begin q << blk.call rescue Exception => e q << e end } em_sig = nil if time_out em_sig = EventMachine.add_timer(time_out) { q << TimeoutError.new } end res = q.shift EventMachine.cancel_timer(em_sig) time_elapsed = ::Time.now - time_start logger.debug("Elapsed time for #{blk}: #{time_elapsed} secs") if time_elapsed > 0.05 if res.is_a?(Exception) raise res end res end |
#clear ⇒ Object
113 114 115 |
# File 'lib/isono/thread_pool.rb', line 113 def clear @queue.clear end |
#graceful_shutdown2 ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/isono/thread_pool.rb', line 163 def graceful_shutdown2 # make new jobs push to dummy queue. old_queue = @queue @queue = ::Queue.new # wait until @queue becomes empty if !old_queue.empty? logger.info("Waiting for #{old_queue.size} worker jobs in #{self}") while !old_queue.empty? sleep 1 end end @worker_threads.each {|t| t.raise WorkerTerminateError } end |
#member_thread?(thread = Thread.current) ⇒ Boolean
125 126 127 |
# File 'lib/isono/thread_pool.rb', line 125 def member_thread?(thread=Thread.current) @worker_threads.has_key?(thread.__id__) end |
#pass(immediate = true, &blk) ⇒ Object
Pass a block to a worker thread. The job is queued until the worker thread found.
65 66 67 68 69 70 71 |
# File 'lib/isono/thread_pool.rb', line 65 def pass(immediate=true, &blk) if immediate && member_thread? return blk.call end @queue << blk end |
#shutdown ⇒ Object
Immediatly shutdown all the worker threads
118 119 120 121 122 123 |
# File 'lib/isono/thread_pool.rb', line 118 def shutdown() @worker_threads.each {|id, t| t.__send__(:raise, WorkerTerminateError) Thread.pass } end |
#shutdown_graceful(timeout) ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/isono/thread_pool.rb', line 129 def shutdown_graceful(timeout) term_sig_q = ::Queue.new worker_num = @worker_threads.size # enqueue the terminate jobs. worker_num.times { @queue.push proc { term_sig_q.enq(1) raise WorkerTerminateError } } em_sig = nil if timeout > 0.0 em_sig = EventMachine.add_timer(timeout) { worker_num.times { term_sig_q << TimeoutError.new } } end timeout_workers = 0 while worker_num > 0 if term_sig_q.deq.is_a?(TimeoutError) timeout_workers += 1 end worker_num -= 1 end logger.error("#{timeout_workers} of worker threads timed out during the cleanup") if timeout_workers > 0 ensure shutdown EventMachine.cancel_timer(em_sig) end |