Class: Krakow::Consumer
- Inherits:
-
Object
- Object
- Krakow::Consumer
- Includes:
- Celluloid, Utils::Lazy
- Defined in:
- lib/krakow/consumer.rb
Overview
Consume messages from a server
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Returns the value of attribute connections.
-
#discovery ⇒ Object
readonly
Returns the value of attribute discovery.
-
#distribution ⇒ Object
readonly
Returns the value of attribute distribution.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Attributes collapse
-
#backoff_interval ⇒ Numeric
The backoff_interval attribute.
-
#backoff_interval? ⇒ TrueClass, FalseClass
Truthiness of the backoff_interval attribute.
-
#channel ⇒ String
The channel attribute.
-
#channel? ⇒ TrueClass, FalseClass
Truthiness of the channel attribute.
-
#connection_options ⇒ Hash
The connection_options attribute.
-
#connection_options? ⇒ TrueClass, FalseClass
Truthiness of the connection_options attribute.
-
#discovery_interval ⇒ Numeric
The discovery_interval attribute.
-
#discovery_interval? ⇒ TrueClass, FalseClass
Truthiness of the discovery_interval attribute.
-
#discovery_jitter ⇒ Numeric
The discovery_jitter attribute.
-
#discovery_jitter? ⇒ TrueClass, FalseClass
Truthiness of the discovery_jitter attribute.
-
#host ⇒ String
The host attribute.
-
#host? ⇒ TrueClass, FalseClass
Truthiness of the host attribute.
-
#max_in_flight ⇒ Integer
The max_in_flight attribute.
-
#max_in_flight? ⇒ TrueClass, FalseClass
Truthiness of the max_in_flight attribute.
-
#notifier ⇒ [Celluloid::Signals, Celluloid::Condition, Celluloid::Actor]
The notifier attribute.
-
#notifier? ⇒ TrueClass, FalseClass
Truthiness of the notifier attribute.
-
#nsqlookupd ⇒ [Array, String]
The nsqlookupd attribute.
-
#nsqlookupd? ⇒ TrueClass, FalseClass
Truthiness of the nsqlookupd attribute.
-
#port ⇒ [String, Integer]
The port attribute.
-
#port? ⇒ TrueClass, FalseClass
Truthiness of the port attribute.
-
#topic ⇒ String
The topic attribute.
-
#topic? ⇒ TrueClass, FalseClass
Truthiness of the topic attribute.
Instance Method Summary collapse
-
#build_connection(host, port, queue) ⇒ Krakow::Connection?
Build a new [Krakow::Connection].
-
#confirm(message_id) ⇒ TrueClass
(also: #finish)
Confirm message has been processed.
-
#connection(key) ⇒ Krakow::Connection
Returns [Krakow::Connection] associated to key.
-
#connection_failure(actor, reason) ⇒ nil
Remove connection references when connection is terminated.
-
#connection_reconnect(connection) ⇒ nil
Action to take when a connection has reconnected.
-
#discover ⇒ nil
Start the discovery interval lookup.
-
#goodbye_my_love! ⇒ nil
Instance destructor.
-
#init! ⇒ nil
Initialize the consumer by starting lookup and adding connections.
-
#initialize(args = {}) ⇒ Consumer
constructor
A new instance of Consumer.
-
#process_message(message, connection) ⇒ Krakow::FrameType
Process a given message if required.
-
#register(connection) ⇒ TrueClass, FalseClass
Register connection with distribution.
-
#requeue(message_id, timeout = 0) ⇒ TrueClass
Requeue message (generally due to processing failure).
-
#to_s ⇒ String
Stringify object.
-
#touch(message_id) ⇒ TrueClass
Touch message (to extend timeout).
-
#update_ready!(connection) ⇒ nil
Send RDY for connection based on distribution rules.
Methods included from Utils::Lazy
Methods included from Utils::Logging
Constructor Details
#initialize(args = {}) ⇒ Consumer
Returns a new instance of Consumer.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/krakow/consumer.rb', line 39 def initialize(args={}) super arguments[:connection_options] = {:features => {}, :config => {}}.merge( arguments[:connection_options] || {} ) @connections = {} @distribution = Distribution::Default.new( :max_in_flight => max_in_flight, :backoff_interval => backoff_interval, :consumer => current_actor ) @queue = Queue.new if(nsqlookupd) debug "Connections will be established via lookup #{nsqlookupd.inspect}" @discovery = Discovery.new(:nsqlookupd => nsqlookupd) discover elsif(host && port) debug "Connection will be established via direct connection #{host}:#{port}" connection = build_connection(host, port, queue) if(register(connection)) info "Registered new connection #{connection}" distribution.redistribute! else abort Error::ConnectionFailure.new("Failed to establish subscription at provided end point (#{host}:#{port}") end else abort Error::ConfigurationError.new('No connection information provided!') end end |
Instance Attribute Details
#connections ⇒ Object (readonly)
Returns the value of attribute connections.
16 17 18 |
# File 'lib/krakow/consumer.rb', line 16 def connections @connections end |
#discovery ⇒ Object (readonly)
Returns the value of attribute discovery.
16 17 18 |
# File 'lib/krakow/consumer.rb', line 16 def discovery @discovery end |
#distribution ⇒ Object (readonly)
Returns the value of attribute distribution.
16 17 18 |
# File 'lib/krakow/consumer.rb', line 16 def distribution @distribution end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
16 17 18 |
# File 'lib/krakow/consumer.rb', line 16 def queue @queue end |
Instance Method Details
#backoff_interval ⇒ Numeric
Returns the backoff_interval attribute.
31 |
# File 'lib/krakow/consumer.rb', line 31 attribute :backoff_interval, Numeric |
#backoff_interval? ⇒ TrueClass, FalseClass
Returns truthiness of the backoff_interval attribute.
31 |
# File 'lib/krakow/consumer.rb', line 31 attribute :backoff_interval, Numeric |
#build_connection(host, port, queue) ⇒ Krakow::Connection?
Build a new [Krakow::Connection]
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/krakow/consumer.rb', line 101 def build_connection(host, port, queue) begin connection = Connection.new( :host => host, :port => port, :queue => queue, :topic => topic, :channel => channel, :notifier => notifier, :features => [:features], :features_args => [:config], :callbacks => { :handle => { :actor => current_actor, :method => :process_message }, :reconnect => { :actor => current_actor, :method => :connection_reconnect } } ) rescue => e error "Failed to build connection (host: #{host} port: #{port} queue: #{queue}) - #{e.class}: #{e}" debug "#{e.class}: #{e}\n#{e.backtrace.join("\n")}" nil end end |
#channel ⇒ String
Returns the channel attribute.
26 |
# File 'lib/krakow/consumer.rb', line 26 attribute :channel, String, :required => true |
#channel? ⇒ TrueClass, FalseClass
Returns truthiness of the channel attribute.
26 |
# File 'lib/krakow/consumer.rb', line 26 attribute :channel, String, :required => true |
#confirm(message_id) ⇒ TrueClass Also known as: finish
Confirm message has been processed
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/krakow/consumer.rb', line 237 def confirm() = . if .respond_to?(:message_id) begin distribution.in_flight_lookup() do |connection| distribution.() connection.transmit(Command::Fin.new(:message_id => )) distribution.success(connection.identifier) update_ready!(connection) end true rescue KeyError => e error "Message confirmation failed: #{e}" abort e rescue Error::LookupFailed => e error "Lookup of message for confirmation failed! <Message ID: #{} - Error: #{e}>" abort e rescue Error::ConnectionUnavailable => e retry end end |
#connection(key) ⇒ Krakow::Connection
Returns [Krakow::Connection] associated to key
73 74 75 |
# File 'lib/krakow/consumer.rb', line 73 def connection(key) @connections[key] end |
#connection_failure(actor, reason) ⇒ nil
Remove connection references when connection is terminated
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/krakow/consumer.rb', line 216 def connection_failure(actor, reason) connections.delete_if do |key, value| if(value == actor && reason.nil?) warn "Connection failure detected. Removing connection: #{key} - #{reason || 'no reason provided'}" begin distribution.remove_connection(key) rescue Error::ConnectionUnavailable, Error::ConnectionFailure warn 'Caught connection unavailability' end distribution.redistribute! true end end nil end |
#connection_options ⇒ Hash
Returns the connection_options attribute.
35 |
# File 'lib/krakow/consumer.rb', line 35 attribute :connection_options, Hash, :default => ->{ Hash.new } |
#connection_options? ⇒ TrueClass, FalseClass
Returns truthiness of the connection_options attribute.
35 |
# File 'lib/krakow/consumer.rb', line 35 attribute :connection_options, Hash, :default => ->{ Hash.new } |
#connection_reconnect(connection) ⇒ nil
Action to take when a connection has reconnected
147 148 149 150 151 |
# File 'lib/krakow/consumer.rb', line 147 def connection_reconnect(connection) connection.transmit(Command::Sub.new(:topic_name => topic, :channel_name => channel)) distribution.set_ready_for(connection) nil end |
#discover ⇒ nil
Start the discovery interval lookup
187 188 189 190 |
# File 'lib/krakow/consumer.rb', line 187 def discover init! after(discovery_interval + (discovery_jitter * rand)){ discover } end |
#discovery_interval ⇒ Numeric
Returns the discovery_interval attribute.
32 |
# File 'lib/krakow/consumer.rb', line 32 attribute :discovery_interval, Numeric, :default => 30 |
#discovery_interval? ⇒ TrueClass, FalseClass
Returns truthiness of the discovery_interval attribute.
32 |
# File 'lib/krakow/consumer.rb', line 32 attribute :discovery_interval, Numeric, :default => 30 |
#discovery_jitter ⇒ Numeric
Returns the discovery_jitter attribute.
33 |
# File 'lib/krakow/consumer.rb', line 33 attribute :discovery_jitter, Numeric, :default => 10.0 |
#discovery_jitter? ⇒ TrueClass, FalseClass
Returns truthiness of the discovery_jitter attribute.
33 |
# File 'lib/krakow/consumer.rb', line 33 attribute :discovery_jitter, Numeric, :default => 10.0 |
#goodbye_my_love! ⇒ nil
Instance destructor
85 86 87 88 89 90 91 92 93 |
# File 'lib/krakow/consumer.rb', line 85 def goodbye_my_love! debug 'Tearing down consumer' connections.values.each do |con| con.terminate if con.alive? end distribution.terminate if distribution && distribution.alive? info 'Consumer torn down' nil end |
#host ⇒ String
Returns the host attribute.
27 |
# File 'lib/krakow/consumer.rb', line 27 attribute :host, String |
#host? ⇒ TrueClass, FalseClass
Returns truthiness of the host attribute.
27 |
# File 'lib/krakow/consumer.rb', line 27 attribute :host, String |
#init! ⇒ nil
Initialize the consumer by starting lookup and adding connections
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/krakow/consumer.rb', line 165 def init! debug 'Running consumer `init!` connection builds' found = discovery.lookup(topic) debug "Discovery results: #{found.inspect}" connection = nil found.each do |node| debug "Processing discovery result: #{node.inspect}" key = Connection.identifier(node[:broadcast_address], node[:tcp_port], topic, channel) unless(connections[key]) connection = build_connection(node[:broadcast_address], node[:tcp_port], queue) info "Registered new connection #{connection}" if register(connection) else debug "Discovery result already registered: #{node.inspect}" end end distribution.redistribute! if connection nil end |
#max_in_flight ⇒ Integer
Returns the max_in_flight attribute.
30 |
# File 'lib/krakow/consumer.rb', line 30 attribute :max_in_flight, Integer, :default => 1 |
#max_in_flight? ⇒ TrueClass, FalseClass
Returns truthiness of the max_in_flight attribute.
30 |
# File 'lib/krakow/consumer.rb', line 30 attribute :max_in_flight, Integer, :default => 1 |
#notifier ⇒ [Celluloid::Signals, Celluloid::Condition, Celluloid::Actor]
Returns the notifier attribute.
34 |
# File 'lib/krakow/consumer.rb', line 34 attribute :notifier, [Celluloid::Signals, Celluloid::Condition, Celluloid::Actor] |
#notifier? ⇒ TrueClass, FalseClass
Returns truthiness of the notifier attribute.
34 |
# File 'lib/krakow/consumer.rb', line 34 attribute :notifier, [Celluloid::Signals, Celluloid::Condition, Celluloid::Actor] |
#nsqlookupd ⇒ [Array, String]
Returns the nsqlookupd attribute.
29 |
# File 'lib/krakow/consumer.rb', line 29 attribute :nsqlookupd, [Array, String] |
#nsqlookupd? ⇒ TrueClass, FalseClass
Returns truthiness of the nsqlookupd attribute.
29 |
# File 'lib/krakow/consumer.rb', line 29 attribute :nsqlookupd, [Array, String] |
#port ⇒ [String, Integer]
Returns the port attribute.
28 |
# File 'lib/krakow/consumer.rb', line 28 attribute :port, [String, Integer] |
#port? ⇒ TrueClass, FalseClass
Returns truthiness of the port attribute.
28 |
# File 'lib/krakow/consumer.rb', line 28 attribute :port, [String, Integer] |
#process_message(message, connection) ⇒ Krakow::FrameType
Process a given message if required
135 136 137 138 139 140 141 |
# File 'lib/krakow/consumer.rb', line 135 def (, connection) if(.is_a?(FrameType::Message)) distribution.(, connection.identifier) .origin = current_actor end end |
#register(connection) ⇒ TrueClass, FalseClass
Register connection with distribution
196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/krakow/consumer.rb', line 196 def register(connection) begin connection.init! connection.transmit(Command::Sub.new(:topic_name => topic, :channel_name => channel)) self.link connection connections[connection.identifier] = connection distribution.add_connection(connection) true rescue Error::BadResponse => e debug "Failed to establish connection: #{e.result ? e.result.error : '<No Response!>'}" connection.terminate false end end |
#requeue(message_id, timeout = 0) ⇒ TrueClass
Requeue message (generally due to processing failure)
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/krakow/consumer.rb', line 264 def requeue(, timeout=0) = . if .respond_to?(:message_id) distribution.in_flight_lookup() do |connection| distribution.() connection.transmit( Command::Req.new( :message_id => , :timeout => timeout ) ) distribution.failure(connection.identifier) update_ready!(connection) end true end |
#to_s ⇒ String
Returns stringify object.
78 79 80 |
# File 'lib/krakow/consumer.rb', line 78 def to_s "<#{self.class.name}:#{object_id} T:#{topic} C:#{channel}>" end |
#topic ⇒ String
Returns the topic attribute.
25 |
# File 'lib/krakow/consumer.rb', line 25 attribute :topic, String, :required => true |
#topic? ⇒ TrueClass, FalseClass
Returns truthiness of the topic attribute.
25 |
# File 'lib/krakow/consumer.rb', line 25 attribute :topic, String, :required => true |
#touch(message_id) ⇒ TrueClass
Touch message (to extend timeout)
284 285 286 287 288 289 290 291 292 293 294 295 296 297 |
# File 'lib/krakow/consumer.rb', line 284 def touch() = . if .respond_to?(:message_id) begin distribution.in_flight_lookup() do |connection| connection.transmit( Command::Touch.new(:message_id => ) ) end true rescue Error::LookupFailed => e error "Lookup of message for touch failed! <Message ID: #{} - Error: #{e}>" abort e end end |
#update_ready!(connection) ⇒ nil
Send RDY for connection based on distribution rules
157 158 159 160 |
# File 'lib/krakow/consumer.rb', line 157 def update_ready!(connection) distribution.set_ready_for(connection) nil end |