Module: GirlFriday

Defined in:
lib/girl_friday/server.rb,
lib/girl_friday.rb,
lib/girl_friday/version.rb,
lib/girl_friday/work_queue.rb,
lib/girl_friday/persistence.rb,
lib/girl_friday/error_handler.rb,
lib/girl_friday/error_handler.rb

Overview

QUEUE1 = GirlFriday::Queue.new(‘ham_cannon’, :size => 15) do |msg|

puts msg

end QUEUE2 = GirlFriday::Queue.new(‘image_crawler’, :size => 5) do |msg|

puts msg

end

Defined Under Namespace

Modules: Store Classes: ErrorHandler, Server, WorkQueue

Constant Summary collapse

VERSION =
"0.9.1"
Queue =
WorkQueue
@@queues =
[]

Class Method Summary collapse

Class Method Details

.queuesObject



20
21
22
# File 'lib/girl_friday.rb', line 20

def self.queues
  @@queues
end

.shutdown!(timeout = 30) ⇒ Object

Notify girl_friday to shutdown ASAP. Workers will not pick up any new work; any new work pushed onto the queues will be pushed onto the backlog (and persisted). This method will block until all queues are quiet or the timeout has passed.

Note that shutdown! just works with existing queues. If you create a new queue, it will act as normal.

WeakRefs make this method full of race conditions with GC. :-(



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/girl_friday.rb', line 38

def self.shutdown!(timeout=30)
  queues.delete_if { |q| !q.weakref_alive? }
  count = queues.size

  if count > 0
    m = Mutex.new
    var = ConditionVariable.new

    queues.each do |q|
      q.shutdown do |queue|
        m.synchronize do
          count -= 1
          var.signal if count == 0
        end
      end
    end

    m.synchronize do
      var.wait(m, timeout)
    end
    #puts "girl_friday shutdown complete"
  end
  count
end

.statusObject



24
25
26
# File 'lib/girl_friday.rb', line 24

def self.status
  queues.delete_if { |q| !q.weakref_alive? }.inject({}) { |memo, queue| queue.weakref_alive? ? memo.merge(queue.status) : memo }
end