Module: GirlFriday
- Defined in:
- lib/girl_friday/server.rb,
lib/girl_friday.rb,
lib/girl_friday/version.rb,
lib/girl_friday/work_queue.rb,
lib/girl_friday/persistence.rb,
lib/girl_friday/error_handler.rb,
lib/girl_friday/error_handler.rb
Overview
QUEUE1 = GirlFriday::Queue.new(‘ham_cannon’, :size => 15) do |msg|
puts msg
end QUEUE2 = GirlFriday::Queue.new(‘image_crawler’, :size => 5) do |msg|
puts msg
end
Defined Under Namespace
Modules: Store Classes: ErrorHandler, Server, WorkQueue
Constant Summary collapse
- VERSION =
"0.9.1"
- Queue =
WorkQueue
- @@queues =
[]
Class Method Summary collapse
- .queues ⇒ Object
-
.shutdown!(timeout = 30) ⇒ Object
Notify girl_friday to shutdown ASAP.
- .status ⇒ Object
Class Method Details
.queues ⇒ Object
20 21 22 |
# File 'lib/girl_friday.rb', line 20 def self.queues @@queues end |
.shutdown!(timeout = 30) ⇒ Object
Notify girl_friday to shutdown ASAP. Workers will not pick up any new work; any new work pushed onto the queues will be pushed onto the backlog (and persisted). This method will block until all queues are quiet or the timeout has passed.
Note that shutdown! just works with existing queues. If you create a new queue, it will act as normal.
WeakRefs make this method full of race conditions with GC. :-(
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/girl_friday.rb', line 38 def self.shutdown!(timeout=30) queues.delete_if { |q| !q.weakref_alive? } count = queues.size if count > 0 m = Mutex.new var = ConditionVariable.new queues.each do |q| q.shutdown do |queue| m.synchronize do count -= 1 var.signal if count == 0 end end end m.synchronize do var.wait(m, timeout) end #puts "girl_friday shutdown complete" end count end |
.status ⇒ Object
24 25 26 |
# File 'lib/girl_friday.rb', line 24 def self.status queues.delete_if { |q| !q.weakref_alive? }.inject({}) { |memo, queue| queue.weakref_alive? ? memo.merge(queue.status) : memo } end |