Class: Lifeguard::Threadpool
- Inherits:
-
Object
- Object
- Lifeguard::Threadpool
- Defined in:
- lib/lifeguard/threadpool.rb
Direct Known Subclasses
Constant Summary collapse
- DEFAULT_REAPING_INTERVAL =
in seconds
5
- DEFAULT_POOL_SIZE =
2
Instance Attribute Summary collapse
-
#name ⇒ Object
Returns the value of attribute name.
-
#options ⇒ Object
Returns the value of attribute options.
-
#pool_size ⇒ Object
Returns the value of attribute pool_size.
Instance Method Summary collapse
- #async(*args, &block) ⇒ Object
-
#busy? ⇒ Boolean
Public Instance Methods.
- #busy_size ⇒ Object
-
#initialize(opts = {}) ⇒ Threadpool
constructor
Constructor.
- #kill! ⇒ Object
- #on_thread_exit(thread) ⇒ Object
- #prune_busy_threads ⇒ Object
- #shutdown(shutdown_timeout = 0) ⇒ Object
- #timeout! ⇒ Object
- #timeout? ⇒ Boolean
Constructor Details
#initialize(opts = {}) ⇒ Threadpool
Constructor
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/lifeguard/threadpool.rb', line 14 def initialize(opts = {}) @options = opts @name = opts[:name] || ::SecureRandom.uuid @pool_size = opts[:pool_size] || DEFAULT_POOL_SIZE # Important info about "timeout", it is controlled by the reaper # so you're threads won't timeout immediately, they will wait for # the reaper to run. Make sure you account for reaper interval # in how you want timeout to work, it may be a bit unexpected in # it's tardiness of timing out threads # @timeout = opts[:timeout] @mutex = ::Mutex.new @busy_threads = [] restart_reaper_unless_alive end |
Instance Attribute Details
#name ⇒ Object
Returns the value of attribute name.
9 10 11 |
# File 'lib/lifeguard/threadpool.rb', line 9 def name @name end |
#options ⇒ Object
Returns the value of attribute options.
9 10 11 |
# File 'lib/lifeguard/threadpool.rb', line 9 def @options end |
#pool_size ⇒ Object
Returns the value of attribute pool_size.
9 10 11 |
# File 'lib/lifeguard/threadpool.rb', line 9 def pool_size @pool_size end |
Instance Method Details
#async(*args, &block) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/lifeguard/threadpool.rb', line 57 def async(*args, &block) queued_the_work = false restart_reaper_unless_alive unless block raise "Threadpool#async must be passed a block" end @mutex.synchronize do prune_busy_threads_without_mutex if busy_size < pool_size queued_the_work = true @busy_threads << ::Thread.new(block, args, self) do |callable, call_args, parent| begin ::Thread.current[:__start_time_in_seconds__] = Time.now.to_i ::Thread.current.abort_on_exception = false callable.call(*call_args) # should we check the args? pass args? ensure parent.on_thread_exit(::Thread.current) end end end prune_busy_threads_without_mutex queued_the_work end end |
#busy? ⇒ Boolean
Public Instance Methods
35 36 37 |
# File 'lib/lifeguard/threadpool.rb', line 35 def busy? busy_size >= pool_size end |
#busy_size ⇒ Object
39 40 41 |
# File 'lib/lifeguard/threadpool.rb', line 39 def busy_size @busy_threads.size end |
#kill! ⇒ Object
43 44 45 46 47 48 49 |
# File 'lib/lifeguard/threadpool.rb', line 43 def kill! @mutex.synchronize do prune_busy_threads_without_mutex @busy_threads.each { |busy_thread| busy_thread.kill } prune_busy_threads_without_mutex end end |
#on_thread_exit(thread) ⇒ Object
51 52 53 54 55 |
# File 'lib/lifeguard/threadpool.rb', line 51 def on_thread_exit(thread) @mutex.synchronize do @busy_threads.delete(thread) end end |
#prune_busy_threads ⇒ Object
87 88 89 90 91 |
# File 'lib/lifeguard/threadpool.rb', line 87 def prune_busy_threads @mutex.synchronize do prune_busy_threads_without_mutex end end |
#shutdown(shutdown_timeout = 0) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/lifeguard/threadpool.rb', line 93 def shutdown(shutdown_timeout = 0) @mutex.synchronize do prune_busy_threads_without_mutex if @busy_threads.size > 0 # Cut the shutdown_timeout by 10 and prune while things finish before the kill (shutdown_timeout/10).times do sleep (shutdown_timeout / 10.0) prune_busy_threads_without_mutex break if busy_size == 0 end sleep(shutdown_timeout/10) @busy_threads.each { |busy_thread| busy_thread.kill } end prune_busy_threads_without_mutex end end |
#timeout! ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/lifeguard/threadpool.rb', line 113 def timeout! return unless timeout? @mutex.synchronize do @busy_threads.each do |busy_thread| if (Time.now.to_i - busy_thread[:__start_time_in_seconds__] > @timeout) busy_thread.kill end end prune_busy_threads_without_mutex end end |
#timeout? ⇒ Boolean
127 128 129 |
# File 'lib/lifeguard/threadpool.rb', line 127 def timeout? !!@timeout end |