Class: Candygram::Dispatch

Inherits:
Object
  • Object
show all
Includes:
Utility
Defined in:
lib/candygram/dispatch.rb

Overview

Pays attention to the Candygram work queue and forks runners to do the work as needed.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utility

#local_ip, #set_status, #update_succeeded?

Constructor Details

#initialize(options = {}) ⇒ Dispatch

Returns a Dispatch object that will keep checking the Candygram work queue and forking runners.

Parameters:

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :frequency (Integer)

    How often to check the queue (in seconds). Defaults to 5.



15
16
17
18
19
20
21
# File 'lib/candygram/dispatch.rb', line 15

def initialize(options={})
  @frequency = options.delete(:frequency) || 5
  @max_per_class = options.delete(:max_per_class) || 10
  @quiet = options.delete(:quiet)
  @runners = {}
  @index = {}
end

Instance Attribute Details

#frequencyObject

Returns the value of attribute frequency.



10
11
12
# File 'lib/candygram/dispatch.rb', line 10

def frequency
  @frequency
end

#max_per_classObject

Returns the value of attribute max_per_class.



10
11
12
# File 'lib/candygram/dispatch.rb', line 10

def max_per_class
  @max_per_class
end

#quietObject

Returns the value of attribute quiet.



10
11
12
# File 'lib/candygram/dispatch.rb', line 10

def quiet
  @quiet
end

#runnersObject (readonly)

Returns the value of attribute runners.



11
12
13
# File 'lib/candygram/dispatch.rb', line 11

def runners
  @runners
end

Instance Method Details

#add_runner(klass, pid) ⇒ Object

Pushes a new PID onto the ‘runners’ hash.



67
68
69
70
71
# File 'lib/candygram/dispatch.rb', line 67

def add_runner(klass, pid)
  @runners[klass] ||= []
  @runners[klass] << pid
  @index[pid] = klass
end

#finishObject

Tells the #run method to stop running. It’s a simple loop condition, not preemptive, so if the dispatcher is sleeping you may have to wait up to frequency seconds before it really ends.



62
63
64
# File 'lib/candygram/dispatch.rb', line 62

def finish
  @finish = true
end

#remove_runner(pid) ⇒ Object

Takes a PID off of the ‘runners’ hash.



74
75
76
77
# File 'lib/candygram/dispatch.rb', line 74

def remove_runner(pid)
  klass = @index.delete(pid)
  @runners[klass].delete(pid)
end

#runObject

Loops over the work queue. You can stop it any time with the #finish method if running in a separate thread.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/candygram/dispatch.rb', line 25

def run
  Kernel.trap("CLD") do 
    pid = Process.wait
    remove_runner(pid)
  end
  
  until @finish
    deliveries = check_queue
    deliveries.each do |del|
      if slot_open?(del) && lock_delivery(del)
        puts "Delivering #{del["class"]}\##{del["method"]} at #{Time.now}" unless quiet
        # Close our connection so that we don't get too many weird copies
        Candygram.connection = nil
        child = fork do
          # We're the runner
          set_status(del, 'running')
          package = Wrapper.unwrap(del["package"])
          args = Wrapper.unwrap(del["arguments"])
          result = package.send(del["method"].to_sym, *args)
          finish_delivery(del, result)
          Candygram.connection = nil
          exit
        end
        # We're the parent
        add_runner del["class"], child
        sleep(0.2)  # Give connections time to wrap up
      end
    end
    sleep frequency
  end
  until @index.empty?
    sleep(0.1) # We trust our trap
  end
end