Class: Candygram::Dispatch
- Inherits:
-
Object
- Object
- Candygram::Dispatch
- Includes:
- Utility
- Defined in:
- lib/candygram/dispatch.rb
Overview
Pays attention to the Candygram work queue and forks runners to do the work as needed.
Instance Attribute Summary collapse
-
#frequency ⇒ Object
Returns the value of attribute frequency.
-
#max_per_class ⇒ Object
Returns the value of attribute max_per_class.
-
#quiet ⇒ Object
Returns the value of attribute quiet.
-
#runners ⇒ Object
readonly
Returns the value of attribute runners.
Instance Method Summary collapse
-
#add_runner(klass, pid) ⇒ Object
Pushes a new PID onto the ‘runners’ hash.
-
#finish ⇒ Object
Tells the #run method to stop running.
-
#initialize(options = {}) ⇒ Dispatch
constructor
Returns a Dispatch object that will keep checking the Candygram work queue and forking runners.
-
#remove_runner(pid) ⇒ Object
Takes a PID off of the ‘runners’ hash.
-
#run ⇒ Object
Loops over the work queue.
Methods included from Utility
#local_ip, #set_status, #update_succeeded?
Constructor Details
#initialize(options = {}) ⇒ Dispatch
Returns a Dispatch object that will keep checking the Candygram work queue and forking runners.
15 16 17 18 19 20 21 |
# File 'lib/candygram/dispatch.rb', line 15 def initialize(={}) @frequency = .delete(:frequency) || 5 @max_per_class = .delete(:max_per_class) || 10 @quiet = .delete(:quiet) @runners = {} @index = {} end |
Instance Attribute Details
#frequency ⇒ Object
Returns the value of attribute frequency.
10 11 12 |
# File 'lib/candygram/dispatch.rb', line 10 def frequency @frequency end |
#max_per_class ⇒ Object
Returns the value of attribute max_per_class.
10 11 12 |
# File 'lib/candygram/dispatch.rb', line 10 def max_per_class @max_per_class end |
#quiet ⇒ Object
Returns the value of attribute quiet.
10 11 12 |
# File 'lib/candygram/dispatch.rb', line 10 def quiet @quiet end |
#runners ⇒ Object (readonly)
Returns the value of attribute runners.
11 12 13 |
# File 'lib/candygram/dispatch.rb', line 11 def runners @runners end |
Instance Method Details
#add_runner(klass, pid) ⇒ Object
Pushes a new PID onto the ‘runners’ hash.
67 68 69 70 71 |
# File 'lib/candygram/dispatch.rb', line 67 def add_runner(klass, pid) @runners[klass] ||= [] @runners[klass] << pid @index[pid] = klass end |
#finish ⇒ Object
Tells the #run method to stop running. It’s a simple loop condition, not preemptive, so if the dispatcher is sleeping you may have to wait up to frequency seconds before it really ends.
62 63 64 |
# File 'lib/candygram/dispatch.rb', line 62 def finish @finish = true end |
#remove_runner(pid) ⇒ Object
Takes a PID off of the ‘runners’ hash.
74 75 76 77 |
# File 'lib/candygram/dispatch.rb', line 74 def remove_runner(pid) klass = @index.delete(pid) @runners[klass].delete(pid) end |
#run ⇒ Object
Loops over the work queue. You can stop it any time with the #finish method if running in a separate thread.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/candygram/dispatch.rb', line 25 def run Kernel.trap("CLD") do pid = Process.wait remove_runner(pid) end until @finish deliveries = check_queue deliveries.each do |del| if slot_open?(del) && lock_delivery(del) puts "Delivering #{del["class"]}\##{del["method"]} at #{Time.now}" unless quiet # Close our connection so that we don't get too many weird copies Candygram.connection = nil child = fork do # We're the runner set_status(del, 'running') package = Wrapper.unwrap(del["package"]) args = Wrapper.unwrap(del["arguments"]) result = package.send(del["method"].to_sym, *args) finish_delivery(del, result) Candygram.connection = nil exit end # We're the parent add_runner del["class"], child sleep(0.2) # Give connections time to wrap up end end sleep frequency end until @index.empty? sleep(0.1) # We trust our trap end end |