Class: Krakow::Connection
- Inherits:
-
Object
- Object
- Krakow::Connection
- Includes:
- Celluloid, Utils::Lazy
- Defined in:
- lib/krakow/connection.rb
Overview
Provides TCP connection to NSQD
Constant Summary collapse
- FEATURES =
Available connection features
[ :max_rdy_count, :max_msg_timeout, :msg_timeout, :tls_v1, :deflate, :deflate_level, :max_deflate_level, :snappy, :sample_rate, :auth_required ]
- EXCLUSIVE_FEATURES =
List of features that may not be enabled together
[[:snappy, :deflate]]
- ENABLEABLE_FEATURES =
List of features that may be enabled by the client
[:tls_v1, :snappy, :deflate, :auth_required]
Instance Attribute Summary collapse
-
#endpoint_settings ⇒ Hash
readonly
Current configuration for endpoint.
- #running ⇒ TrueClass, FalseClass readonly
-
#socket ⇒ Ksocket
readonly
Underlying socket like instance.
Attributes collapse
-
#callbacks ⇒ Hash
The callbacks attribute.
-
#callbacks? ⇒ TrueClass, FalseClass
Truthiness of the callbacks attribute.
-
#channel ⇒ String
The channel attribute.
-
#channel? ⇒ TrueClass, FalseClass
Truthiness of the channel attribute.
-
#enforce_features ⇒ [TrueClass,FalseClass]
The enforce_features attribute.
-
#enforce_features? ⇒ TrueClass, FalseClass
Truthiness of the enforce_features attribute.
-
#error_wait ⇒ Numeric
The error_wait attribute.
-
#error_wait? ⇒ TrueClass, FalseClass
Truthiness of the error_wait attribute.
-
#features ⇒ Hash
The features attribute.
-
#features? ⇒ TrueClass, FalseClass
Truthiness of the features attribute.
-
#features_args ⇒ Hash
The features_args attribute.
-
#features_args? ⇒ TrueClass, FalseClass
Truthiness of the features_args attribute.
-
#host ⇒ String
The host attribute.
-
#host? ⇒ TrueClass, FalseClass
Truthiness of the host attribute.
-
#notifier ⇒ [Celluloid::Signals, Celluloid::Condition, Celluloid::Actor]
The notifier attribute.
-
#notifier? ⇒ TrueClass, FalseClass
Truthiness of the notifier attribute.
-
#port ⇒ [String,Integer]
The port attribute.
-
#port? ⇒ TrueClass, FalseClass
Truthiness of the port attribute.
-
#queue ⇒ [Queue, Consumer::Queue]
The queue attribute.
-
#queue? ⇒ TrueClass, FalseClass
Truthiness of the queue attribute.
-
#response_interval ⇒ Numeric
The response_interval attribute.
-
#response_interval? ⇒ TrueClass, FalseClass
Truthiness of the response_interval attribute.
-
#response_wait ⇒ Numeric
The response_wait attribute.
-
#response_wait? ⇒ TrueClass, FalseClass
Truthiness of the response_wait attribute.
-
#responses ⇒ Queue
The responses attribute.
-
#responses? ⇒ TrueClass, FalseClass
Truthiness of the responses attribute.
-
#topic ⇒ String
The topic attribute.
-
#topic? ⇒ TrueClass, FalseClass
Truthiness of the topic attribute.
-
#version ⇒ String
The version attribute.
-
#version? ⇒ TrueClass, FalseClass
Truthiness of the version attribute.
Class Method Summary collapse
-
.identifier(host, port, topic, channel) ⇒ String
Generate identifier for connection.
Instance Method Summary collapse
-
#auth_required ⇒ TrueClass
Send authentication request for connection.
-
#callback_for(type, arg, connection) ⇒ Object
Execute callback for given type.
-
#connected? ⇒ TrueClass, FalseClass
Underlying socket is connected.
-
#connection_cleanup ⇒ nil
Destructor method for cleanup.
-
#deflate ⇒ TrueClass
Enable deflate feature on underlying socket.
-
#handle(message) ⇒ Krakow::FrameType?
Handle non-message type Krakow::FrameType.
-
#identifier ⇒ String
Identifier for this connection.
-
#identify_and_negotiate ⇒ TrueClass
IDENTIFY with server and negotiate features.
-
#identify_defaults ⇒ Hash
Default settings for IDENTIFY.
-
#init! ⇒ nil
Initialize the connection.
-
#initialize(args = {}) ⇒ Connection
constructor
Create new instance.
-
#process_to_queue! ⇒ nil
Receive messages and place into queue.
-
#receive ⇒ Krakow::FrameType?
Receive from server.
-
#receiving? ⇒ TrueClass, FalseClass
Is connection currently receiving a message.
-
#snappy ⇒ TrueClass
Enable snappy feature on underlying socket.
-
#tls_v1 ⇒ TrueClass
Enable TLS feature on underlying socket.
-
#to_s ⇒ String
Stringify object.
-
#transmit(message) ⇒ TrueClass, Krakow::FrameType
Send message to remote server.
-
#transmit_with_response(message, wait_time) ⇒ Krakow::FrameType
Sends message and waits for response.
-
#wait_time_for(message) ⇒ Numeric
Returns configured wait time for given message type.
Methods included from Utils::Lazy
Methods included from Utils::Logging
Constructor Details
#initialize(args = {}) ⇒ Connection
Create new instance
95 96 97 98 99 |
# File 'lib/krakow/connection.rb', line 95 def initialize(args={}) super @endpoint_settings = {} @running = false end |
Instance Attribute Details
#endpoint_settings ⇒ Hash (readonly)
Returns current configuration for endpoint.
48 49 50 |
# File 'lib/krakow/connection.rb', line 48 def endpoint_settings @endpoint_settings end |
#running ⇒ TrueClass, FalseClass (readonly)
52 53 54 |
# File 'lib/krakow/connection.rb', line 52 def running @running end |
#socket ⇒ Ksocket (readonly)
Returns underlying socket like instance.
50 51 52 |
# File 'lib/krakow/connection.rb', line 50 def socket @socket end |
Class Method Details
.identifier(host, port, topic, channel) ⇒ String
Generate identifier for connection
15 16 17 |
# File 'lib/krakow/connection.rb', line 15 def self.identifier(host, port, topic, channel) [host, port, topic, channel].compact.join('__') end |
Instance Method Details
#auth_required ⇒ TrueClass
Send authentication request for connection
339 340 341 342 343 344 345 346 347 348 349 |
# File 'lib/krakow/connection.rb', line 339 def auth_required info 'Authentication required for this connection' if(feature_args[:auth]) transmit(Command::Auth.new(:secret => feature_args[:auth])) response = receive true else error 'No authentication information provided for connection!' abort 'Authentication failure. No authentication secret provided' end end |
#callback_for(type, arg, connection) ⇒ Object
Execute callback for given type
261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/krakow/connection.rb', line 261 def callback_for(type, *args) callback = callbacks[type] if(callback) debug "Processing connection callback for #{type.inspect} (#{callback.inspect})" if(callback[:actor].alive?) callback[:actor].send(callback[:method], *(args + [current_actor])) else error "Expected actor for callback processing is not alive! (type: `#{type.inspect}`)" end else debug "No connection callback defined for #{type.inspect}" args.size == 1 ? args.first : args end end |
#callbacks ⇒ Hash
Returns the callbacks attribute.
67 |
# File 'lib/krakow/connection.rb', line 67 attribute :callbacks, Hash, :default => ->{ Hash.new } |
#callbacks? ⇒ TrueClass, FalseClass
Returns truthiness of the callbacks attribute.
67 |
# File 'lib/krakow/connection.rb', line 67 attribute :callbacks, Hash, :default => ->{ Hash.new } |
#channel ⇒ String
Returns the channel attribute.
64 |
# File 'lib/krakow/connection.rb', line 64 attribute :channel, String |
#channel? ⇒ TrueClass, FalseClass
Returns truthiness of the channel attribute.
64 |
# File 'lib/krakow/connection.rb', line 64 attribute :channel, String |
#connected? ⇒ TrueClass, FalseClass
Returns underlying socket is connected.
385 386 387 388 389 390 391 |
# File 'lib/krakow/connection.rb', line 385 def connected? begin !!(socket && socket.alive?) rescue Celluloid::DeadActorError false end end |
#connection_cleanup ⇒ nil
Destructor method for cleanup
170 171 172 173 174 175 176 177 178 179 |
# File 'lib/krakow/connection.rb', line 170 def connection_cleanup debug 'Tearing down connection' @running = false if(connected?) socket.terminate end @socket = nil info 'Connection torn down' nil end |
#deflate ⇒ TrueClass
Enable deflate feature on underlying socket
365 366 367 368 369 370 371 |
# File 'lib/krakow/connection.rb', line 365 def deflate debug 'Loading support for deflate compression and converting connection' @socket = ConnectionFeatures::Deflate::Io.new(socket, features_args) response = receive info "Deflate connection conversion complete. Response: #{response.inspect}" true end |
#enforce_features ⇒ [TrueClass,FalseClass]
Returns the enforce_features attribute.
74 |
# File 'lib/krakow/connection.rb', line 74 attribute :enforce_features, [TrueClass,FalseClass], :default => true |
#enforce_features? ⇒ TrueClass, FalseClass
Returns truthiness of the enforce_features attribute.
74 |
# File 'lib/krakow/connection.rb', line 74 attribute :enforce_features, [TrueClass,FalseClass], :default => true |
#error_wait ⇒ Numeric
Returns the error_wait attribute.
73 |
# File 'lib/krakow/connection.rb', line 73 attribute :error_wait, Numeric, :default => 0 |
#error_wait? ⇒ TrueClass, FalseClass
Returns truthiness of the error_wait attribute.
73 |
# File 'lib/krakow/connection.rb', line 73 attribute :error_wait, Numeric, :default => 0 |
#features ⇒ Hash
Returns the features attribute.
70 |
# File 'lib/krakow/connection.rb', line 70 attribute :features, Hash, :default => ->{ Hash.new } |
#features? ⇒ TrueClass, FalseClass
Returns truthiness of the features attribute.
70 |
# File 'lib/krakow/connection.rb', line 70 attribute :features, Hash, :default => ->{ Hash.new } |
#features_args ⇒ Hash
Returns the features_args attribute.
75 |
# File 'lib/krakow/connection.rb', line 75 attribute :features_args, Hash, :default => ->{ Hash.new } |
#features_args? ⇒ TrueClass, FalseClass
Returns truthiness of the features_args attribute.
75 |
# File 'lib/krakow/connection.rb', line 75 attribute :features_args, Hash, :default => ->{ Hash.new } |
#handle(message) ⇒ Krakow::FrameType?
Handle non-message type Krakow::FrameType
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/krakow/connection.rb', line 236 def handle() # Grab heartbeats upfront if(.is_a?(FrameType::Response) && .response == '_heartbeat_') debug 'Responding to heartbeat' transmit Command::Nop.new nil else = callback_for(:handle, ) if(!.is_a?(FrameType::Message)) debug "Captured non-message type response: #{}" responses << nil else end end end |
#host ⇒ String
Returns the host attribute.
61 |
# File 'lib/krakow/connection.rb', line 61 attribute :host, String, :required => true |
#host? ⇒ TrueClass, FalseClass
Returns truthiness of the host attribute.
61 |
# File 'lib/krakow/connection.rb', line 61 attribute :host, String, :required => true |
#identifier ⇒ String
Returns identifier for this connection.
102 103 104 |
# File 'lib/krakow/connection.rb', line 102 def identifier self.class.identifier(host, port, topic, channel) end |
#identify_and_negotiate ⇒ TrueClass
IDENTIFY with server and negotiate features
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/krakow/connection.rb', line 307 def identify_and_negotiate expected_features = identify_defaults.merge(features) ident = Command::Identify.new( expected_features ) socket.put(ident.to_line) response = receive if(expected_features[:feature_negotiation]) begin @endpoint_settings = MultiJson.load(response.content, :symbolize_keys => true) info "Connection settings: #{endpoint_settings.inspect}" # Enable things we need to enable ENABLEABLE_FEATURES.each do |key| if(endpoint_settings[key]) send(key) elsif(enforce_features && expected_features[key]) abort Error::ConnectionFeatureFailure.new("Failed to enable #{key} feature on connection!") end end rescue MultiJson::LoadError => e error "Failed to parse response from Identify request: #{e} - #{response}" abort e end else @endpoint_settings = {} end true end |
#identify_defaults ⇒ Hash
Returns default settings for IDENTIFY.
292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/krakow/connection.rb', line 292 def identify_defaults unless(@identify_defaults) @identify_defaults = { :short_id => Socket.gethostname, :long_id => Socket.gethostbyname(Socket.gethostname).flatten.compact.first, :user_agent => "krakow/#{Krakow::VERSION}", :feature_negotiation => true } end @identify_defaults end |
#init! ⇒ nil
Initialize the connection
114 115 116 117 118 |
# File 'lib/krakow/connection.rb', line 114 def init! connect! async.process_to_queue! nil end |
#notifier ⇒ [Celluloid::Signals, Celluloid::Condition, Celluloid::Actor]
Returns the notifier attribute.
69 |
# File 'lib/krakow/connection.rb', line 69 attribute :notifier, [Celluloid::Signals, Celluloid::Condition, Celluloid::Actor] |
#notifier? ⇒ TrueClass, FalseClass
Returns truthiness of the notifier attribute.
69 |
# File 'lib/krakow/connection.rb', line 69 attribute :notifier, [Celluloid::Signals, Celluloid::Condition, Celluloid::Actor] |
#port ⇒ [String,Integer]
Returns the port attribute.
62 |
# File 'lib/krakow/connection.rb', line 62 attribute :port, [String,Integer], :required => true |
#port? ⇒ TrueClass, FalseClass
Returns truthiness of the port attribute.
62 |
# File 'lib/krakow/connection.rb', line 62 attribute :port, [String,Integer], :required => true |
#process_to_queue! ⇒ nil
Receive messages and place into queue
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/krakow/connection.rb', line 212 def process_to_queue! unless(@running) @running = true while(@running) = handle(receive) if() debug "Adding message to queue #{}" queue << if(notifier) warn "Sending new message notification: #{notifier} - #{}" notifier.broadcast() end else debug 'Received `nil` message. Ignoring.' end end end nil end |
#queue ⇒ [Queue, Consumer::Queue]
Returns the queue attribute.
66 |
# File 'lib/krakow/connection.rb', line 66 attribute :queue, [Queue, Consumer::Queue], :default => ->{ Queue.new } |
#queue? ⇒ TrueClass, FalseClass
Returns truthiness of the queue attribute.
66 |
# File 'lib/krakow/connection.rb', line 66 attribute :queue, [Queue, Consumer::Queue], :default => ->{ Queue.new } |
#receive ⇒ Krakow::FrameType?
Receive from server
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/krakow/connection.rb', line 185 def receive debug 'Read wait for frame start' buf = socket.get(8) if(buf) @receiving = true debug "<<< #{buf.inspect}" struct = FrameType.decode(buf) debug "Decoded structure: #{struct.inspect}" struct[:data] = socket.get(struct[:size]) debug "<<< #{struct[:data].inspect}" @receiving = false frame = FrameType.build(struct) debug "Struct: #{struct.inspect} Frame: #{frame.inspect}" frame else nil end end |
#receiving? ⇒ TrueClass, FalseClass
Returns is connection currently receiving a message.
205 206 207 |
# File 'lib/krakow/connection.rb', line 205 def receiving? !!@receiving end |
#response_interval ⇒ Numeric
Returns the response_interval attribute.
72 |
# File 'lib/krakow/connection.rb', line 72 attribute :response_interval, Numeric, :default => 0.03 |
#response_interval? ⇒ TrueClass, FalseClass
Returns truthiness of the response_interval attribute.
72 |
# File 'lib/krakow/connection.rb', line 72 attribute :response_interval, Numeric, :default => 0.03 |
#response_wait ⇒ Numeric
Returns the response_wait attribute.
71 |
# File 'lib/krakow/connection.rb', line 71 attribute :response_wait, Numeric, :default => 1.0 |
#response_wait? ⇒ TrueClass, FalseClass
Returns truthiness of the response_wait attribute.
71 |
# File 'lib/krakow/connection.rb', line 71 attribute :response_wait, Numeric, :default => 1.0 |
#responses ⇒ Queue
Returns the responses attribute.
68 |
# File 'lib/krakow/connection.rb', line 68 attribute :responses, Queue, :default => ->{ Queue.new } |
#responses? ⇒ TrueClass, FalseClass
Returns truthiness of the responses attribute.
68 |
# File 'lib/krakow/connection.rb', line 68 attribute :responses, Queue, :default => ->{ Queue.new } |
#snappy ⇒ TrueClass
Enable snappy feature on underlying socket
354 355 356 357 358 359 360 |
# File 'lib/krakow/connection.rb', line 354 def snappy info 'Loading support for snappy compression and converting connection' @socket = ConnectionFeatures::SnappyFrames::Io.new(socket, features_args) response = receive info "Snappy connection conversion complete. Response: #{response.inspect}" true end |
#tls_v1 ⇒ TrueClass
Enable TLS feature on underlying socket
376 377 378 379 380 381 382 |
# File 'lib/krakow/connection.rb', line 376 def tls_v1 info 'Enabling TLS for connection' @socket = ConnectionFeatures::Ssl::Io.new(socket, features_args) response = receive info "TLS enable complete. Response: #{response.inspect}" true end |
#to_s ⇒ String
Returns stringify object.
107 108 109 |
# File 'lib/krakow/connection.rb', line 107 def to_s "<#{self.class.name}:#{object_id} {#{host}:#{port}}>" end |
#topic ⇒ String
Returns the topic attribute.
63 |
# File 'lib/krakow/connection.rb', line 63 attribute :topic, String |
#topic? ⇒ TrueClass, FalseClass
Returns truthiness of the topic attribute.
63 |
# File 'lib/krakow/connection.rb', line 63 attribute :topic, String |
#transmit(message) ⇒ TrueClass, Krakow::FrameType
Send message to remote server
124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/krakow/connection.rb', line 124 def transmit() unless(.respond_to?(:to_line)) abort TypeError.new("Expecting type `Krakow::FrameType` but received `#{.class}`") end output = .to_line response_wait = wait_time_for() if(response_wait > 0) transmit_with_response(, response_wait) else debug ">>> #{output}" socket.put(output) true end end |
#transmit_with_response(message, wait_time) ⇒ Krakow::FrameType
Sends message and waits for response
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/krakow/connection.rb', line 143 def transmit_with_response(, wait_time) responses.clear socket.put(.to_line) response = nil (wait_time / response_interval).to_i.times do |i| response = responses.pop unless responses.empty? break if response sleep(response_interval) end if(response) .response = response if(.error?(response)) res = Error::BadResponse.new "Message transmission failed #{}" res.result = response abort res end response else unless(Command.response_for() == :error_only) abort Error::BadResponse::NoResponse.new "No response provided for message #{}" end end end |
#version ⇒ String
Returns the version attribute.
65 |
# File 'lib/krakow/connection.rb', line 65 attribute :version, String, :default => 'v2' |
#version? ⇒ TrueClass, FalseClass
Returns truthiness of the version attribute.
65 |
# File 'lib/krakow/connection.rb', line 65 attribute :version, String, :default => 'v2' |
#wait_time_for(message) ⇒ Numeric
Returns configured wait time for given message type
280 281 282 283 284 285 286 287 288 289 |
# File 'lib/krakow/connection.rb', line 280 def wait_time_for() case Command.response_for() when :required response_wait when :error_only error_wait else 0 end end |