Module: GirlFriday

Defined in:
lib/girl_friday/server.rb,
lib/girl_friday.rb,
lib/girl_friday/batch.rb,
lib/girl_friday/version.rb,
lib/girl_friday/work_queue.rb,
lib/girl_friday/persistence.rb,
lib/girl_friday/error_handler.rb

Overview

QUEUE1 = GirlFriday::Queue.new(‘ham_cannon’, :size => 15) do |msg|

puts msg

end QUEUE2 = GirlFriday::Queue.new(‘image_crawler’, :size => 5) do |msg|

puts msg

end

Defined Under Namespace

Modules: ErrorHandler, Store Classes: Batch, Server, WorkQueue

Constant Summary collapse

VERSION =
"0.9.7"
Queue =
WorkQueue

Class Method Summary collapse

Class Method Details

.add_queue(ref) ⇒ Object



22
23
24
25
26
27
28
# File 'lib/girl_friday.rb', line 22

def self.add_queue(ref)
  @lock.synchronize do
    @queues ||= []
    @queues.reject! { |q| !q.weakref_alive? }
    @queues << ref
  end
end

.queuesObject



36
37
38
# File 'lib/girl_friday.rb', line 36

def self.queues
  @queues || []
end

.remove_queue(ref) ⇒ Object



30
31
32
33
34
# File 'lib/girl_friday.rb', line 30

def self.remove_queue(ref)
  @lock.synchronize do
    @queues.delete ref
  end
end

.shutdown!(timeout = 30) ⇒ Object

Notify girl_friday to shutdown ASAP. Workers will not pick up any new work; any new work pushed onto the queues will be pushed onto the backlog (and persisted). This method will block until all queues are quiet or the timeout has passed.

Note that shutdown! just works with existing queues. If you create a new queue, it will act as normal.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/girl_friday.rb', line 58

def self.shutdown!(timeout=30)
  qs = queues.select { |q| q.weakref_alive? }
  count = qs.size

  if count > 0
    m = Mutex.new
    var = ConditionVariable.new

    qs.each do |q|
      next if !q.weakref_alive?
      begin
        q.__getobj__.shutdown do |queue|
          m.synchronize do
            count -= 1
            var.signal if count == 0
          end
        end
      rescue WeakRef::RefError
        m.synchronize do
          count -= 1
          var.signal if count == 0
        end
      end
    end

    m.synchronize do
      var.wait(m, timeout) if count != 0
    end
  end
  count
end

.statusObject



40
41
42
43
44
45
46
47
48
# File 'lib/girl_friday.rb', line 40

def self.status
  queues.inject({}) do |memo, queue|
    begin
      memo = memo.merge(queue.__getobj__.status)
    rescue WeakRef::RefError
    end
    memo
  end
end