Module: GirlFriday
- Defined in:
- lib/girl_friday/server.rb,
lib/girl_friday.rb,
lib/girl_friday/batch.rb,
lib/girl_friday/version.rb,
lib/girl_friday/work_queue.rb,
lib/girl_friday/persistence.rb,
lib/girl_friday/error_handler.rb
Overview
QUEUE1 = GirlFriday::Queue.new(‘ham_cannon’, :size => 15) do |msg|
puts msg
end QUEUE2 = GirlFriday::Queue.new(‘image_crawler’, :size => 5) do |msg|
puts msg
end
Defined Under Namespace
Modules: ErrorHandler, Store Classes: Batch, Server, WorkQueue
Constant Summary collapse
- VERSION =
"0.9.7"
- Queue =
WorkQueue
Class Method Summary collapse
- .add_queue(ref) ⇒ Object
- .queues ⇒ Object
- .remove_queue(ref) ⇒ Object
-
.shutdown!(timeout = 30) ⇒ Object
Notify girl_friday to shutdown ASAP.
- .status ⇒ Object
Class Method Details
.add_queue(ref) ⇒ Object
22 23 24 25 26 27 28 |
# File 'lib/girl_friday.rb', line 22 def self.add_queue(ref) @lock.synchronize do @queues ||= [] @queues.reject! { |q| !q.weakref_alive? } @queues << ref end end |
.queues ⇒ Object
36 37 38 |
# File 'lib/girl_friday.rb', line 36 def self.queues @queues || [] end |
.remove_queue(ref) ⇒ Object
30 31 32 33 34 |
# File 'lib/girl_friday.rb', line 30 def self.remove_queue(ref) @lock.synchronize do @queues.delete ref end end |
.shutdown!(timeout = 30) ⇒ Object
Notify girl_friday to shutdown ASAP. Workers will not pick up any new work; any new work pushed onto the queues will be pushed onto the backlog (and persisted). This method will block until all queues are quiet or the timeout has passed.
Note that shutdown! just works with existing queues. If you create a new queue, it will act as normal.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/girl_friday.rb', line 58 def self.shutdown!(timeout=30) qs = queues.select { |q| q.weakref_alive? } count = qs.size if count > 0 m = Mutex.new var = ConditionVariable.new qs.each do |q| next if !q.weakref_alive? begin q.__getobj__.shutdown do |queue| m.synchronize do count -= 1 var.signal if count == 0 end end rescue WeakRef::RefError m.synchronize do count -= 1 var.signal if count == 0 end end end m.synchronize do var.wait(m, timeout) if count != 0 end end count end |
.status ⇒ Object
40 41 42 43 44 45 46 47 48 |
# File 'lib/girl_friday.rb', line 40 def self.status queues.inject({}) do |memo, queue| begin memo = memo.merge(queue.__getobj__.status) rescue WeakRef::RefError end memo end end |