Class: BackgroundQueue::ServerLib::ThreadManager
- Inherits:
-
Object
- Object
- BackgroundQueue::ServerLib::ThreadManager
- Defined in:
- lib/background_queue/server_lib/thread_manager.rb
Overview
make sure threads are schedules and the max number of threads is controlled
Defined Under Namespace
Classes: ForcedStop
Instance Attribute Summary collapse
-
#max_threads ⇒ Object
Returns the value of attribute max_threads.
-
#running_threads ⇒ Object
readonly
Returns the value of attribute running_threads.
Instance Method Summary collapse
- #change_concurrency(max_threads) ⇒ Object
- #control_access(&block) ⇒ Object
-
#initialize(server, max_threads) ⇒ ThreadManager
constructor
A new instance of ThreadManager.
- #protect_access(&block) ⇒ Object
-
#signal_access ⇒ Object
signal any waiting threads this should only be called from with a protect_access/control_access block will do nothing if there are already too many threads running.
- #start(clazz) ⇒ Object
- #wait(timeout_limit = 100) ⇒ Object
-
#wait_on_access ⇒ Object
wait for the condition must be called from within protect_access/control_access block.
Constructor Details
#initialize(server, max_threads) ⇒ ThreadManager
Returns a new instance of ThreadManager.
7 8 9 10 11 12 13 14 |
# File 'lib/background_queue/server_lib/thread_manager.rb', line 7 def initialize(server, max_threads) @server = server @max_threads = max_threads @running_threads = 0 @mutex = Mutex.new @condvar = ConditionVariable.new @threads = [] end |
Instance Attribute Details
#max_threads ⇒ Object
Returns the value of attribute max_threads.
4 5 6 |
# File 'lib/background_queue/server_lib/thread_manager.rb', line 4 def max_threads @max_threads end |
#running_threads ⇒ Object (readonly)
Returns the value of attribute running_threads.
5 6 7 |
# File 'lib/background_queue/server_lib/thread_manager.rb', line 5 def running_threads @running_threads end |
Instance Method Details
#change_concurrency(max_threads) ⇒ Object
52 53 54 55 56 57 58 59 60 61 |
# File 'lib/background_queue/server_lib/thread_manager.rb', line 52 def change_concurrency(max_threads) @mutex.synchronize { if max_threads > @max_threads for i in @max_threads...max_threads @condvar.signal end end @max_threads = max_threads } end |
#control_access(&block) ⇒ Object
22 23 24 25 26 27 28 29 30 31 |
# File 'lib/background_queue/server_lib/thread_manager.rb', line 22 def control_access(&block) @mutex.synchronize { if @running_threads >= @max_threads && @server.running? @running_threads -= 1 @condvar.wait(@mutex) @running_threads += 1 end block.call } end |
#protect_access(&block) ⇒ Object
16 17 18 19 20 |
# File 'lib/background_queue/server_lib/thread_manager.rb', line 16 def protect_access(&block) @mutex.synchronize { block.call } end |
#signal_access ⇒ Object
signal any waiting threads this should only be called from with a protect_access/control_access block will do nothing if there are already too many threads running
36 37 38 |
# File 'lib/background_queue/server_lib/thread_manager.rb', line 36 def signal_access @condvar.signal unless @running_threads >= @max_threads end |
#start(clazz) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/background_queue/server_lib/thread_manager.rb', line 64 def start(clazz) @mutex.synchronize { for i in 0...@max_threads runner = clazz.new(@server) @running_threads += 1 #puts "started thread, running=#{@running_threads}" @threads << Thread.new(runner) { |runner| begin runner.run rescue Exception=>e @server.logger.error("Error in thread: #{e.}") @server.logger.debug(e.backtrace.join("\n")) end @mutex.synchronize { @running_threads -= 1 #puts "finished thread, running=#{@running_threads}" } } end } end |
#wait(timeout_limit = 100) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/background_queue/server_lib/thread_manager.rb', line 86 def wait(timeout_limit = 100) #for thread in @threads @mutex.synchronize { @condvar.broadcast } #end #while @running_threads > 0 # @mutex.synchronize { # @condvar.signal # } # sleep(0.01) #end begin Timeout::timeout(timeout_limit) { for thread in @threads thread.join end } rescue Timeout::Error => te for thread in @threads begin if thread.alive? thread.raise BackgroundQueue::ServerLib::ThreadManager::ForcedStop.new("Timeout when forcing threads to stop") end rescue Exception=>e #ignore end end begin Timeout::timeout(timeout_limit) { for thread in @threads thread.join end } rescue Timeout::Error => te2 @server.logger.error("Timeout while waiting for forced stop threads to finish") end end end |
#wait_on_access ⇒ Object
wait for the condition must be called from within protect_access/control_access block
42 43 44 45 46 47 48 49 50 |
# File 'lib/background_queue/server_lib/thread_manager.rb', line 42 def wait_on_access if @server.running? @running_threads -= 1 #puts "waiting" @condvar.wait(@mutex) #puts "woken" @running_threads += 1 end end |