Class: Bunny::ConsumerWorkPool

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/consumer_work_pool.rb

Overview

Thread pool that dispatches consumer deliveries. Not supposed to be shared between channels or threads.

Every channel its own consumer pool.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size = 1, abort_on_exception = false, shutdown_timeout = 60) ⇒ ConsumerWorkPool

Returns a new instance of ConsumerWorkPool.



20
21
22
23
24
25
26
27
28
29
# File 'lib/bunny/consumer_work_pool.rb', line 20

def initialize(size = 1, abort_on_exception = false, shutdown_timeout = 60)
  @size  = size
  @abort_on_exception = abort_on_exception
  @shutdown_timeout = shutdown_timeout
  @shutdown_mutex = ::Mutex.new
  @shutdown_conditional = ::ConditionVariable.new
  @queue = ::Queue.new
  @paused = false
  @running = false
end

Instance Attribute Details

#abort_on_exceptionObject (readonly)

Returns the value of attribute abort_on_exception.



18
19
20
# File 'lib/bunny/consumer_work_pool.rb', line 18

def abort_on_exception
  @abort_on_exception
end

#sizeObject (readonly)

Returns the value of attribute size.



17
18
19
# File 'lib/bunny/consumer_work_pool.rb', line 17

def size
  @size
end

#threadsObject (readonly)

API



16
17
18
# File 'lib/bunny/consumer_work_pool.rb', line 16

def threads
  @threads
end

Instance Method Details

#backlogObject



52
53
54
# File 'lib/bunny/consumer_work_pool.rb', line 52

def backlog
  @queue.length
end

#busy?Boolean

Returns:

  • (Boolean)


56
57
58
# File 'lib/bunny/consumer_work_pool.rb', line 56

def busy?
  !@queue.empty?
end

#join(timeout = nil) ⇒ Object



77
78
79
# File 'lib/bunny/consumer_work_pool.rb', line 77

def join(timeout = nil)
  (@threads || []).each { |t| t.join(timeout) }
end

#killObject



93
94
95
96
97
# File 'lib/bunny/consumer_work_pool.rb', line 93

def kill
  @running = false

  (@threads || []).each { |t| t.kill }
end

#pauseObject



81
82
83
84
# File 'lib/bunny/consumer_work_pool.rb', line 81

def pause
  @running = false
  @paused = true
end

#resumeObject



86
87
88
89
90
91
# File 'lib/bunny/consumer_work_pool.rb', line 86

def resume
  @running = true
  @paused = false

  @threads.each { |t| t.run }
end

#running?Boolean

Returns:

  • (Boolean)


48
49
50
# File 'lib/bunny/consumer_work_pool.rb', line 48

def running?
  @running
end

#shutdown(wait_for_workers = false) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/bunny/consumer_work_pool.rb', line 60

def shutdown(wait_for_workers = false)
  was_running = running?
  @running = false

  @size.times do
    submit do |*args|
      throw :terminate
    end
  end

  return if !(wait_for_workers && @shutdown_timeout && was_running)

  @shutdown_mutex.synchronize do
    @shutdown_conditional.wait(@shutdown_mutex, @shutdown_timeout) if busy?
  end
end

#startObject



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/bunny/consumer_work_pool.rb', line 36

def start
  @threads = []

  @size.times do
    t = Thread.new(&method(:run_loop))
    t.abort_on_exception = true if abort_on_exception
    @threads << t
  end

  @running = true
end

#submit(callable = nil, &block) ⇒ Object



32
33
34
# File 'lib/bunny/consumer_work_pool.rb', line 32

def submit(callable = nil, &block)
  @queue.push(callable || block)
end