Class: Sqskiq::Manager
Instance Method Summary
collapse
#shutting_down, #subscribe_for_shutdown
Constructor Details
#initialize(empty_queue_throttle) ⇒ Manager
Returns a new instance of Manager.
12
13
14
15
|
# File 'lib/sqskiq/manager.rb', line 12
def initialize(empty_queue_throttle)
@empty_queue_throttle = empty_queue_throttle
subscribe_for_shutdown
end
|
Instance Method Details
#batch_done(messages) ⇒ Object
30
31
32
33
|
# File 'lib/sqskiq/manager.rb', line 30
def batch_done(messages)
@deleter.async.delete(messages)
new_fetch(1)
end
|
#bootstrap ⇒ Object
17
18
19
20
21
22
23
|
# File 'lib/sqskiq/manager.rb', line 17
def bootstrap
@fetcher = Celluloid::Actor[:fetcher]
@batcher = Celluloid::Actor[:batcher]
@deleter = Celluloid::Actor[:deleter]
new_fetch(@fetcher.size)
end
|
#fetch_done(messages) ⇒ Object
25
26
27
28
|
# File 'lib/sqskiq/manager.rb', line 25
def fetch_done(messages)
@empty_queue = messages.empty?
@batcher.async.process(messages) unless @shutting_down
end
|
#new_fetch(num) ⇒ Object
35
36
37
38
39
|
# File 'lib/sqskiq/manager.rb', line 35
def new_fetch(num)
after(throttle) do
num.times { @fetcher.async.fetch unless @shutting_down }
end
end
|
#running? ⇒ Boolean
41
42
43
|
# File 'lib/sqskiq/manager.rb', line 41
def running?
not (@shutting_down and @deleter.busy_size == 0 and @batcher.busy_size == 0)
end
|
#throttle ⇒ Object
45
46
47
|
# File 'lib/sqskiq/manager.rb', line 45
def throttle
@empty_queue ? @empty_queue_throttle : 0
end
|