Class: Krakow::Distribution::Default
- Inherits:
-
Krakow::Distribution
- Object
- Krakow::Distribution
- Krakow::Distribution::Default
- Defined in:
- lib/krakow/distribution/default.rb
Overview
Default distribution implementation. This uses a round-robin approach for less than ideal states.
Instance Attribute Summary collapse
-
#less_than_ideal_stack ⇒ Object
readonly
Returns the value of attribute less_than_ideal_stack.
-
#watch_dog ⇒ Object
readonly
Returns the value of attribute watch_dog.
Attributes inherited from Krakow::Distribution
#flight_record, #ideal, #registry
Attributes included from Utils::Lazy::InstanceMethods
Instance Method Summary collapse
-
#calculate_ready!(connection_identifier) ⇒ Integer?
Update connection ready count.
-
#force_unready ⇒ nil
Force a connection to give up RDY state so next in stack can receive.
-
#less_than_ideal? ⇒ TrueClass, FalseClass
Is ideal less than 1.
-
#less_than_ideal_ready! ⇒ Krakow::Connection?
Find next connection to receive RDY count.
-
#rdy_connections ⇒ Array<Krakow::Connection>
All connections with RDY state.
-
#redistribute! ⇒ Object
recalculate ‘ideal` and update RDY on connections.
-
#set_ready_for(connection, *args) ⇒ Krakow::FrameType::Error?
Adds extra functionality to provide round robin RDY setting when in less than ideal state.
-
#waiting_connections ⇒ Array<Krakow::Connection>
All connections without RDY state.
Methods inherited from Krakow::Distribution
#add_connection, #backoff_interval, #backoff_interval?, #connection_lookup, #connections, #consumer, #consumer?, #failure, #in_flight_lookup, #initial_ready, #initialize, #max_in_flight, #max_in_flight?, #ready_for, #register_message, #registry_lookup, #remove_connection, #success, #unregister_message, #watch_dog_interval, #watch_dog_interval?
Methods included from Utils::Lazy::ClassMethods
#attribute, #attributes, #set_attributes
Methods included from Utils::Lazy::InstanceMethods
Methods included from Utils::Lazy
Methods included from Utils::Logging
Constructor Details
This class inherits a constructor from Krakow::Distribution
Instance Attribute Details
#less_than_ideal_stack ⇒ Object (readonly)
Returns the value of attribute less_than_ideal_stack.
9 10 11 |
# File 'lib/krakow/distribution/default.rb', line 9 def less_than_ideal_stack @less_than_ideal_stack end |
#watch_dog ⇒ Object (readonly)
Returns the value of attribute watch_dog.
9 10 11 |
# File 'lib/krakow/distribution/default.rb', line 9 def watch_dog @watch_dog end |
Instance Method Details
#calculate_ready!(connection_identifier) ⇒ Integer?
Update connection ready count
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/krakow/distribution/default.rb', line 99 def calculate_ready!(connection_identifier) begin registry_info = registry_lookup(connection_identifier) unless(less_than_ideal?) registry_info[:ready] = ideal - registry_info[:in_flight] if(registry_info[:ready] < 0 || registry_info[:backoff_until] > Time.now.to_i) registry_info[:ready] = 0 registry_info[:backoff_timer].cancel if registry[:backoff_timer] registry_info[:backoff_timer] = after(registry_info[:backoff_until] - Time.now.to_i) do calculate_ready!(connection_identifier) set_ready_for(connection_lookup(connection_identifier)) unless less_than_ideal? end end registry_info[:ready] else registry_info[:ready] = 0 end rescue Error::ConnectionFailure warn 'Failed connection encountered!' rescue Error::ConnectionUnavailable warn 'Unavailable connection encountered!' end end |
#force_unready ⇒ nil
Force a connection to give up RDY state so next in stack can receive
144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/krakow/distribution/default.rb', line 144 def force_unready debug 'Forcing a connection into an unready state due to less than ideal state' connection = rdy_connections.shuffle.first if(connection) debug "Stripping RDY state from connection: #{connection}" calculate_ready!(connection.identifier) set_ready_for(connection) else warn "Failed to locate available connection for RDY aquisition!" end nil end |
#less_than_ideal? ⇒ TrueClass, FalseClass
Is ideal less than 1
48 49 50 |
# File 'lib/krakow/distribution/default.rb', line 48 def less_than_ideal? ideal < 1 end |
#less_than_ideal_ready! ⇒ Krakow::Connection?
Find next connection to receive RDY count
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/krakow/distribution/default.rb', line 55 def less_than_ideal_ready! admit_defeat = false connection = nil until(connection || (admit_defeat && less_than_ideal_stack.empty?)) if(less_than_ideal_stack.nil? || less_than_ideal_stack.empty?) @less_than_ideal_stack = waiting_connections admit_defeat = true end con = less_than_ideal_stack.pop if(con) unless(registry_lookup(con.identifier)[:backoff_until] > Time.now.to_i) connection = con end end end if(connection) registry_lookup(connection.identifier)[:ready] = 1 connection end end |
#rdy_connections ⇒ Array<Krakow::Connection>
All connections with RDY state
135 136 137 138 139 |
# File 'lib/krakow/distribution/default.rb', line 135 def rdy_connections registry.find_all do |conn_id, info| info[:ready] > 0 end.map{|conn_id, info| connection_lookup(conn_id) }.compact end |
#redistribute! ⇒ Object
recalculate ‘ideal` and update RDY on connections
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/krakow/distribution/default.rb', line 12 def redistribute! @ideal = registry.size < 1 ? 0 : max_in_flight / registry.size debug "Distribution calculated ideal: #{ideal}" if(less_than_ideal?) registry.each do |connection_id, reg_info| reg_info[:ready] = 0 end max_in_flight.times do less_than_ideal_ready! end connections.each do |connection| set_ready_for(connection, :force) end watch_dog.cancel if watch_dog @watch_dog = every(watch_dog_interval) do force_unready end else if(watch_dog) watch_dog.cancel @watch_dog = nil end connections.each do |connection| current_ready = ready_for(connection.identifier) calculate_ready!(connection.identifier) unless(current_ready == ready_for(connection.identifier)) debug "Redistribution ready setting update for connection #{connection}" set_ready_for(connection) end end end end |
#set_ready_for(connection, *args) ⇒ Krakow::FrameType::Error?
Adds extra functionality to provide round robin RDY setting when in less than ideal state
82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/krakow/distribution/default.rb', line 82 def set_ready_for(connection, *args) super connection if(less_than_ideal? && !args.include?(:force)) debug "RDY set ignored due to less than ideal state (con: #{connection})" con = less_than_ideal_ready! if(con) watch_dog.reset if watch_dog super con else warn 'Failed to set RDY state while less than ideal. Connection stack is empty!' end end end |
#waiting_connections ⇒ Array<Krakow::Connection>
All connections without RDY state
126 127 128 129 130 |
# File 'lib/krakow/distribution/default.rb', line 126 def waiting_connections registry.find_all do |conn_id, info| info[:ready] < 1 && info[:in_flight] < 1 && info[:backoff_until] < Time.now.to_i end.map{|conn_id, info| connection_lookup(conn_id) }.compact end |