Class: Lifeguard::Threadpool

Inherits:
Object
  • Object
show all
Defined in:
lib/lifeguard/threadpool.rb

Direct Known Subclasses

InfiniteThreadpool

Constant Summary collapse

DEFAULT_REAPING_INTERVAL =

in seconds

5
DEFAULT_POOL_SIZE =
2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Threadpool

Constructor



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/lifeguard/threadpool.rb', line 14

def initialize(opts = {})
  @options = opts
  @name = opts[:name] || ::SecureRandom.uuid
  @pool_size = opts[:pool_size] || DEFAULT_POOL_SIZE

  # Important info about "timeout", it is controlled by the reaper
  # so you're threads won't timeout immediately, they will wait for
  # the reaper to run.  Make sure you account for reaper interval
  # in how you want timeout to work, it may be a bit unexpected in 
  # it's tardiness of timing out threads
  #
  @timeout = opts[:timeout]
  @mutex = ::Mutex.new
  @busy_threads = ThreadGroup.new

  restart_reaper_unless_alive
end

Instance Attribute Details

#nameObject

Returns the value of attribute name.



9
10
11
# File 'lib/lifeguard/threadpool.rb', line 9

def name
  @name
end

#optionsObject

Returns the value of attribute options.



9
10
11
# File 'lib/lifeguard/threadpool.rb', line 9

def options
  @options
end

#pool_sizeObject

Returns the value of attribute pool_size.



9
10
11
# File 'lib/lifeguard/threadpool.rb', line 9

def pool_size
  @pool_size
end

Instance Method Details

#async(*args, &block) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/lifeguard/threadpool.rb', line 49

def async(*args, &block)
  queued_the_work = false
  restart_reaper_unless_alive

  unless block
    raise "Threadpool#async must be passed a block"
  end

  @mutex.synchronize do
    if busy_size < pool_size
      queued_the_work = true

      @busy_threads.add ::Thread.new(block, args, self) { |callable, call_args, parent|
        ::Thread.current.abort_on_exception = false
        ::Thread.current[:__start_time_in_seconds__] = Time.now.to_i
        callable.call(*call_args) # should we check the args? pass args?
      }
    end

    queued_the_work
  end
end

#busy?Boolean

Public Instance Methods

Returns:

  • (Boolean)


35
36
37
# File 'lib/lifeguard/threadpool.rb', line 35

def busy?
  busy_size >= pool_size
end

#busy_sizeObject



39
40
41
# File 'lib/lifeguard/threadpool.rb', line 39

def busy_size
  @busy_threads.list.size
end

#kill!Object



43
44
45
46
47
# File 'lib/lifeguard/threadpool.rb', line 43

def kill!
  @mutex.synchronize do
    @busy_threads.list.each { |busy_thread| busy_thread.kill }
  end
end

#shutdown(shutdown_timeout = 3) ⇒ Object



72
73
74
75
76
77
78
79
# File 'lib/lifeguard/threadpool.rb', line 72

def shutdown(shutdown_timeout = 3)
  kill_at = Time.now.to_f + shutdown_timeout

  @mutex.synchronize do
    sleep 0.01 while busy_size > 0 && Time.now.to_f < kill_at
    @busy_threads.list.each { |busy_thread| busy_thread.kill }
  end
end

#timeout!Object



81
82
83
84
85
86
87
88
89
90
91
# File 'lib/lifeguard/threadpool.rb', line 81

def timeout!
  return unless timeout?

  @mutex.synchronize do
    @busy_threads.list.each do |busy_thread|
      if (Time.now.to_i - busy_thread[:__start_time_in_seconds__] > @timeout)
        busy_thread.kill
      end
    end
  end
end

#timeout?Boolean

Returns:

  • (Boolean)


93
94
95
# File 'lib/lifeguard/threadpool.rb', line 93

def timeout?
  !!@timeout
end