Class: Lifeguard::Threadpool
- Inherits:
-
Object
- Object
- Lifeguard::Threadpool
- Defined in:
- lib/lifeguard/threadpool.rb
Direct Known Subclasses
Constant Summary collapse
- DEFAULT_REAPING_INTERVAL =
in seconds
5
- DEFAULT_POOL_SIZE =
2
Instance Attribute Summary collapse
-
#name ⇒ Object
Returns the value of attribute name.
-
#options ⇒ Object
Returns the value of attribute options.
-
#pool_size ⇒ Object
Returns the value of attribute pool_size.
Instance Method Summary collapse
- #async(*args, &block) ⇒ Object
-
#busy? ⇒ Boolean
Public Instance Methods.
- #busy_size ⇒ Object
-
#initialize(opts = {}) ⇒ Threadpool
constructor
Constructor.
- #kill! ⇒ Object
- #shutdown(shutdown_timeout = 3) ⇒ Object
- #timeout! ⇒ Object
- #timeout? ⇒ Boolean
Constructor Details
#initialize(opts = {}) ⇒ Threadpool
Constructor
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/lifeguard/threadpool.rb', line 14 def initialize(opts = {}) @options = opts @name = opts[:name] || ::SecureRandom.uuid @pool_size = opts[:pool_size] || DEFAULT_POOL_SIZE # Important info about "timeout", it is controlled by the reaper # so you're threads won't timeout immediately, they will wait for # the reaper to run. Make sure you account for reaper interval # in how you want timeout to work, it may be a bit unexpected in # it's tardiness of timing out threads # @timeout = opts[:timeout] @mutex = ::Mutex.new @busy_threads = ThreadGroup.new restart_reaper_unless_alive end |
Instance Attribute Details
#name ⇒ Object
Returns the value of attribute name.
9 10 11 |
# File 'lib/lifeguard/threadpool.rb', line 9 def name @name end |
#options ⇒ Object
Returns the value of attribute options.
9 10 11 |
# File 'lib/lifeguard/threadpool.rb', line 9 def @options end |
#pool_size ⇒ Object
Returns the value of attribute pool_size.
9 10 11 |
# File 'lib/lifeguard/threadpool.rb', line 9 def pool_size @pool_size end |
Instance Method Details
#async(*args, &block) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/lifeguard/threadpool.rb', line 49 def async(*args, &block) queued_the_work = false restart_reaper_unless_alive unless block raise "Threadpool#async must be passed a block" end @mutex.synchronize do if busy_size < pool_size queued_the_work = true @busy_threads.add ::Thread.new(block, args, self) { |callable, call_args, parent| ::Thread.current.abort_on_exception = false ::Thread.current[:__start_time_in_seconds__] = Time.now.to_i callable.call(*call_args) # should we check the args? pass args? } end queued_the_work end end |
#busy? ⇒ Boolean
Public Instance Methods
35 36 37 |
# File 'lib/lifeguard/threadpool.rb', line 35 def busy? busy_size >= pool_size end |
#busy_size ⇒ Object
39 40 41 |
# File 'lib/lifeguard/threadpool.rb', line 39 def busy_size @busy_threads.list.size end |
#kill! ⇒ Object
43 44 45 46 47 |
# File 'lib/lifeguard/threadpool.rb', line 43 def kill! @mutex.synchronize do @busy_threads.list.each { |busy_thread| busy_thread.kill } end end |
#shutdown(shutdown_timeout = 3) ⇒ Object
72 73 74 75 76 77 78 79 |
# File 'lib/lifeguard/threadpool.rb', line 72 def shutdown(shutdown_timeout = 3) kill_at = Time.now.to_f + shutdown_timeout @mutex.synchronize do sleep 0.01 while busy_size > 0 && Time.now.to_f < kill_at @busy_threads.list.each { |busy_thread| busy_thread.kill } end end |
#timeout! ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/lifeguard/threadpool.rb', line 81 def timeout! return unless timeout? @mutex.synchronize do @busy_threads.list.each do |busy_thread| if (Time.now.to_i - busy_thread[:__start_time_in_seconds__] > @timeout) busy_thread.kill end end end end |
#timeout? ⇒ Boolean
93 94 95 |
# File 'lib/lifeguard/threadpool.rb', line 93 def timeout? !!@timeout end |