Class: Krakow::Distribution Abstract
- Inherits:
-
Object
- Object
- Krakow::Distribution
- Extended by:
- Utils::Lazy::ClassMethods
- Includes:
- Celluloid, Utils::Lazy, Utils::Lazy::InstanceMethods
- Defined in:
- lib/krakow/distribution.rb,
lib/krakow/distribution/default.rb
Overview
Message distribution
Direct Known Subclasses
Defined Under Namespace
Classes: Default
Instance Attribute Summary collapse
-
#flight_record ⇒ Object
Returns the value of attribute flight_record.
-
#ideal ⇒ Object
Returns the value of attribute ideal.
-
#registry ⇒ Object
Returns the value of attribute registry.
Attributes included from Utils::Lazy::InstanceMethods
Attributes collapse
-
#backoff_interval ⇒ Numeric
The backoff_interval attribute.
-
#backoff_interval? ⇒ TrueClass, FalseClass
Truthiness of the backoff_interval attribute.
-
#consumer ⇒ Krakow::Consumer
The consumer attribute.
-
#consumer? ⇒ TrueClass, FalseClass
Truthiness of the consumer attribute.
-
#max_in_flight ⇒ Integer
The max_in_flight attribute.
-
#max_in_flight? ⇒ TrueClass, FalseClass
Truthiness of the max_in_flight attribute.
-
#watch_dog_interval ⇒ Numeric
The watch_dog_interval attribute.
-
#watch_dog_interval? ⇒ TrueClass, FalseClass
Truthiness of the watch_dog_interval attribute.
Instance Method Summary collapse
-
#add_connection(connection) ⇒ TrueClass
Add connection to make available for RDY distribution.
-
#calculate_ready!(connection_identifier) ⇒ Integer
- Abstract
-
Determine RDY value for given connection.
-
#connection_lookup(identifier) ⇒ Krakow::Connection?
Return connection associated with given registry key.
-
#connections ⇒ Array<Krakow::Connection>
Connections in registry.
-
#failure(connection_identifier) ⇒ TrueClass
Log failure of processed message.
-
#in_flight_lookup(msg_id) {|connection| ... } ⇒ Krakow::Connection, Object
Return source connection for given message ID.
-
#initial_ready ⇒ Integer
Initial ready value used for new connections.
-
#initialize(args = {}) ⇒ Distribution
constructor
A new instance of Distribution.
-
#ready_for(connection_identifier) ⇒ Integer
Return the currently configured RDY value for given connnection.
-
#redistribute! ⇒ Object
- Abstract
-
Reset flight distributions.
-
#register_message(message, connection_identifier) ⇒ Integer
Registers message into registry and configures for distribution.
-
#registry_lookup(connection_identifier) ⇒ Hash
Return registry information for given connection.
-
#remove_connection(connection_identifier, *args) ⇒ TrueClass
Remove connection from RDY distribution.
-
#set_ready_for(connection, *_) ⇒ Krakow::FrameType::Error?
Send RDY for given connection.
-
#success(connection_identifier) ⇒ TrueClass
Log success of processed message.
-
#unregister_message(message) ⇒ Krakow::Connection, NilClass
Remove message metadata from registry.
Methods included from Utils::Lazy::ClassMethods
attribute, attributes, set_attributes
Methods included from Utils::Lazy::InstanceMethods
Methods included from Utils::Lazy
Methods included from Utils::Logging
Constructor Details
#initialize(args = {}) ⇒ Distribution
Returns a new instance of Distribution.
33 34 35 36 37 38 |
# File 'lib/krakow/distribution.rb', line 33 def initialize(args={}) super @ideal = 0 @flight_record = {} @registry = {} end |
Instance Attribute Details
#flight_record ⇒ Object
Returns the value of attribute flight_record.
17 18 19 |
# File 'lib/krakow/distribution.rb', line 17 def flight_record @flight_record end |
#ideal ⇒ Object
Returns the value of attribute ideal.
17 18 19 |
# File 'lib/krakow/distribution.rb', line 17 def ideal @ideal end |
#registry ⇒ Object
Returns the value of attribute registry.
17 18 19 |
# File 'lib/krakow/distribution.rb', line 17 def registry @registry end |
Instance Method Details
#add_connection(connection) ⇒ TrueClass
Add connection to make available for RDY distribution
123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/krakow/distribution.rb', line 123 def add_connection(connection) unless(registry[connection.identifier]) registry[connection.identifier] = { :ready => initial_ready, :in_flight => 0, :failures => 0, :backoff_until => 0 } end true end |
#backoff_interval ⇒ Numeric
Returns the backoff_interval attribute.
28 |
# File 'lib/krakow/distribution.rb', line 28 attribute :backoff_interval, Numeric |
#backoff_interval? ⇒ TrueClass, FalseClass
Returns truthiness of the backoff_interval attribute.
28 |
# File 'lib/krakow/distribution.rb', line 28 attribute :backoff_interval, Numeric |
#calculate_ready!(connection_identifier) ⇒ Integer
- Abstract
-
Determine RDY value for given connection
48 49 50 |
# File 'lib/krakow/distribution.rb', line 48 def calculate_ready!(connection_identifier) raise NotImplementedError.new 'Custom `#calculate_ready!` method must be provided!' end |
#connection_lookup(identifier) ⇒ Krakow::Connection?
Return connection associated with given registry key
156 157 158 |
# File 'lib/krakow/distribution.rb', line 156 def connection_lookup(identifier) consumer.connection(identifier) end |
#connections ⇒ Array<Krakow::Connection>
Returns connections in registry.
192 193 194 195 196 |
# File 'lib/krakow/distribution.rb', line 192 def connections registry.keys.map do |identifier| connection_lookup(identifier) end.compact end |
#consumer ⇒ Krakow::Consumer
Returns the consumer attribute.
26 |
# File 'lib/krakow/distribution.rb', line 26 attribute :consumer, Krakow::Consumer, :required => true |
#consumer? ⇒ TrueClass, FalseClass
Returns truthiness of the consumer attribute.
26 |
# File 'lib/krakow/distribution.rb', line 26 attribute :consumer, Krakow::Consumer, :required => true |
#failure(connection_identifier) ⇒ TrueClass
Log failure of processed message
202 203 204 205 206 207 208 209 |
# File 'lib/krakow/distribution.rb', line 202 def failure(connection_identifier) if(backoff_interval) registry_info = registry_lookup(connection_identifier) registry_info[:failures] += 1 registry_info[:backoff_until] = Time.now.to_i + (registry_info[:failures] * backoff_interval) end true end |
#in_flight_lookup(msg_id) {|connection| ... } ⇒ Krakow::Connection, Object
Return source connection for given message ID
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/krakow/distribution.rb', line 166 def in_flight_lookup(msg_id) connection = connection_lookup(flight_record[msg_id]) unless(connection) abort Krakow::Error::LookupFailed.new("Failed to locate in flight message (ID: #{msg_id})") end if(block_given?) begin yield connection rescue => e abort e end else connection end end |
#initial_ready ⇒ Integer
Initial ready value used for new connections
99 100 101 |
# File 'lib/krakow/distribution.rb', line 99 def initial_ready ideal > 0 ? 1 : 0 end |
#max_in_flight ⇒ Integer
Returns the max_in_flight attribute.
29 |
# File 'lib/krakow/distribution.rb', line 29 attribute :max_in_flight, Integer, :default => 1 |
#max_in_flight? ⇒ TrueClass, FalseClass
Returns truthiness of the max_in_flight attribute.
29 |
# File 'lib/krakow/distribution.rb', line 29 attribute :max_in_flight, Integer, :default => 1 |
#ready_for(connection_identifier) ⇒ Integer
Return the currently configured RDY value for given connnection
79 80 81 |
# File 'lib/krakow/distribution.rb', line 79 def ready_for(connection_identifier) registry_lookup(connection_identifier)[:ready] end |
#redistribute! ⇒ Object
- Abstract
-
Reset flight distributions
41 42 43 |
# File 'lib/krakow/distribution.rb', line 41 def redistribute! raise NotImplementedError.new 'Custom `#redistrubute!` method must be provided!' end |
#register_message(message, connection_identifier) ⇒ Integer
Registers message into registry and configures for distribution
108 109 110 111 112 113 114 115 116 117 |
# File 'lib/krakow/distribution.rb', line 108 def (, connection_identifier) if(flight_record[.]) abort KeyError.new "Message is already registered in flight record! (#{.})" else registry_info = registry_lookup(connection_identifier) registry_info[:in_flight] += 1 flight_record[.] = connection_identifier calculate_ready!(connection_identifier) end end |
#registry_lookup(connection_identifier) ⇒ Hash
Return registry information for given connection
186 187 188 189 |
# File 'lib/krakow/distribution.rb', line 186 def registry_lookup(connection_identifier) registry[connection_identifier] || abort(Krakow::Error::LookupFailed.new("Failed to locate connection information in registry (#{connection_identifier})")) end |
#remove_connection(connection_identifier, *args) ⇒ TrueClass
Remove connection from RDY distribution
139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/krakow/distribution.rb', line 139 def remove_connection(connection_identifier, *args) # remove connection from registry registry.delete(connection_identifier) # remove any in flight messages flight_record.delete_if do |k,v| if(v == connection_identifier) warn "Removing in flight reference due to failed connection: #{v}" true end end true end |
#set_ready_for(connection, *_) ⇒ Krakow::FrameType::Error?
Send RDY for given connection
88 89 90 91 92 93 94 |
# File 'lib/krakow/distribution.rb', line 88 def set_ready_for(connection, *_) connection.transmit( Command::Rdy.new( :count => ready_for(connection.identifier) ) ) end |
#success(connection_identifier) ⇒ TrueClass
Log success of processed message
215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/krakow/distribution.rb', line 215 def success(connection_identifier) if(backoff_interval) registry_info = registry_lookup(connection_identifier) if(registry_info[:failures] > 1) registry_info[:failures] -= 1 registry_info[:backoff_until] = Time.now.to_i + (registry_info[:failures] * backoff_interval) else registry_info[:failures] = 0 end end true end |
#unregister_message(message) ⇒ Krakow::Connection, NilClass
Remove message metadata from registry
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/krakow/distribution.rb', line 56 def () msg_id = .respond_to?(:message_id) ? . : .to_s connection = connection_lookup(flight_record[msg_id]) flight_record.delete(msg_id) if(connection) begin ident = connection.identifier registry_info = registry_lookup(ident) registry_info[:in_flight] -= 1 calculate_ready!(ident) connection rescue Celluloid::DeadActorError warn 'Connection is dead. No recalculation applied on ready.' end else warn 'No connection associated to message via lookup. No recalculation applied on ready.' end end |
#watch_dog_interval ⇒ Numeric
Returns the watch_dog_interval attribute.
27 |
# File 'lib/krakow/distribution.rb', line 27 attribute :watch_dog_interval, Numeric, :default => 1.0 |
#watch_dog_interval? ⇒ TrueClass, FalseClass
Returns truthiness of the watch_dog_interval attribute.
27 |
# File 'lib/krakow/distribution.rb', line 27 attribute :watch_dog_interval, Numeric, :default => 1.0 |