Class: Lifeguard::Threadpool

Inherits:
Object
  • Object
show all
Defined in:
lib/lifeguard/threadpool.rb

Direct Known Subclasses

InfiniteThreadpool

Constant Summary collapse

DEFAULT_REAPING_INTERVAL =

in seconds

5
DEFAULT_POOL_SIZE =
2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Threadpool

Constructor



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/lifeguard/threadpool.rb', line 14

def initialize(opts = {})
  @options = opts
  @name = opts[:name] || ::SecureRandom.uuid
  @pool_size = opts[:pool_size] || DEFAULT_POOL_SIZE

  # Important info about "timeout", it is controlled by the reaper
  # so you're threads won't timeout immediately, they will wait for
  # the reaper to run.  Make sure you account for reaper interval
  # in how you want timeout to work, it may be a bit unexpected in 
  # it's tardiness of timing out threads
  #
  @timeout = opts[:timeout]
  @mutex = ::Mutex.new
  @busy_threads = []

  restart_reaper_unless_alive
end

Instance Attribute Details

#nameObject

Returns the value of attribute name.



9
10
11
# File 'lib/lifeguard/threadpool.rb', line 9

def name
  @name
end

#optionsObject

Returns the value of attribute options.



9
10
11
# File 'lib/lifeguard/threadpool.rb', line 9

def options
  @options
end

#pool_sizeObject

Returns the value of attribute pool_size.



9
10
11
# File 'lib/lifeguard/threadpool.rb', line 9

def pool_size
  @pool_size
end

Instance Method Details

#async(*args, &block) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/lifeguard/threadpool.rb', line 57

def async(*args, &block)
  queued_the_work = false
  restart_reaper_unless_alive

  unless block
    raise "Threadpool#async must be passed a block"
  end

  @mutex.synchronize do
    prune_busy_threads_without_mutex

    if busy_size < pool_size
      queued_the_work = true

      @busy_threads << ::Thread.new(block, args, self) do |callable, call_args, parent|
        begin
          ::Thread.current[:__start_time_in_seconds__] = Time.now.to_i
          ::Thread.current.abort_on_exception = false
          callable.call(*call_args) # should we check the args? pass args?
        ensure
          parent.on_thread_exit(::Thread.current)
        end
      end
    end

    prune_busy_threads_without_mutex
    queued_the_work
  end
end

#busy?Boolean

Public Instance Methods

Returns:

  • (Boolean)


35
36
37
# File 'lib/lifeguard/threadpool.rb', line 35

def busy?
  busy_size >= pool_size
end

#busy_sizeObject



39
40
41
# File 'lib/lifeguard/threadpool.rb', line 39

def busy_size
  @busy_threads.size
end

#kill!Object



43
44
45
46
47
48
49
# File 'lib/lifeguard/threadpool.rb', line 43

def kill!
  @mutex.synchronize do
    prune_busy_threads_without_mutex
    @busy_threads.each { |busy_thread| busy_thread.kill }
    prune_busy_threads_without_mutex
  end
end

#on_thread_exit(thread) ⇒ Object



51
52
53
54
55
# File 'lib/lifeguard/threadpool.rb', line 51

def on_thread_exit(thread)
  @mutex.synchronize do
    @busy_threads.delete(thread)
  end
end

#prune_busy_threadsObject



87
88
89
90
91
# File 'lib/lifeguard/threadpool.rb', line 87

def prune_busy_threads
  @mutex.synchronize do
    prune_busy_threads_without_mutex
  end
end

#shutdown(shutdown_timeout = 0) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/lifeguard/threadpool.rb', line 93

def shutdown(shutdown_timeout = 0)
  @mutex.synchronize do
    prune_busy_threads_without_mutex

    if @busy_threads.size > 0
      # Cut the shutdown_timeout by 10 and prune while things finish before the kill
      (shutdown_timeout/10).times do 
        sleep (shutdown_timeout / 10.0)
        prune_busy_threads_without_mutex
        break if busy_size == 0
      end

      sleep(shutdown_timeout/10)
      @busy_threads.each { |busy_thread| busy_thread.kill }
    end

    prune_busy_threads_without_mutex
  end
end

#timeout!Object



113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/lifeguard/threadpool.rb', line 113

def timeout!
  return unless timeout?

  @mutex.synchronize do
    @busy_threads.each do |busy_thread|
      if (Time.now.to_i - busy_thread[:__start_time_in_seconds__] > @timeout)
        busy_thread.kill
      end
    end

    prune_busy_threads_without_mutex
  end
end

#timeout?Boolean

Returns:

  • (Boolean)


127
128
129
# File 'lib/lifeguard/threadpool.rb', line 127

def timeout?
  !!@timeout
end