Class: ZK::Threadpool

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/z_k/threadpool.rb

Overview

a simple threadpool for running blocks of code off the main thread

Constant Summary collapse

DEFAULT_SIZE =
5

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size = nil) ⇒ Threadpool

Returns a new instance of Threadpool.



17
18
19
20
21
22
23
24
25
26
# File 'lib/z_k/threadpool.rb', line 17

def initialize(size=nil)
  @size = size || self.class.default_size

  @threadpool = []
  @threadqueue = ::Queue.new

  @mutex = Mutex.new

  start!
end

Class Attribute Details

.default_sizeObject

size of the ZK.defer threadpool (defaults to 5)



10
11
12
# File 'lib/z_k/threadpool.rb', line 10

def default_size
  @default_size
end

Instance Attribute Details

#sizeObject (readonly)

the size of this threadpool



15
16
17
# File 'lib/z_k/threadpool.rb', line 15

def size
  @size
end

Instance Method Details

#defer(callable = nil, &blk) ⇒ Object

Queue an operation to be run on an internal threadpool. You may either provide an object that responds_to?(:call) or pass a block. There is no mechanism for retrieving the result of the operation, it is purely fire-and-forget, so the user is expected to make arrangements for this in their code.



34
35
36
37
38
39
40
41
42
43
# File 'lib/z_k/threadpool.rb', line 34

def defer(callable=nil, &blk)
  callable ||= blk

  # XXX(slyphon): do we care if the threadpool is not running?
  raise Exceptions::ThreadpoolIsNotRunningException unless running?
  raise ArgumentError, "Argument to Threadpool#defer must respond_to?(:call)" unless callable.respond_to?(:call)

  @threadqueue << callable
  nil
end

#running?Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/z_k/threadpool.rb', line 45

def running?
  @mutex.synchronize { @running }
end

#shutdown(timeout = 2) ⇒ Object

join all threads in this threadpool, they will be given a maximum of timeout seconds to exit before they are considered hung and will be ignored (this is an issue with threads in general: see blog.headius.com/2008/02/rubys-threadraise-threadkill-timeoutrb.html for more info)

the default timeout is 2 seconds per thread



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/z_k/threadpool.rb', line 66

def shutdown(timeout=2)
  @mutex.synchronize do
    return unless @running

    @running = false

    @threadqueue.clear
    @size.times { @threadqueue << KILL_TOKEN }

    threads, @threadpool = @threadpool, []

    while th = threads.shift
      begin
        th.join(timeout)
      rescue Exception => e
        logger.error { "Caught exception shutting down threadpool" }
        logger.error { e.to_std_format }
      end
    end

    @threadqueue = ::Queue.new
  end

  nil
end

#start!Object

starts the threadpool if not already running



50
51
52
53
54
55
56
57
# File 'lib/z_k/threadpool.rb', line 50

def start!
  @mutex.synchronize do
    return false if @running
    @running = true
    spawn_threadpool
  end
  true
end