Class: Krakow::Connection

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Utils::Lazy
Defined in:
lib/krakow/connection.rb

Overview

Provides TCP connection to NSQD

Constant Summary collapse

FEATURES =

Available connection features

[
  :max_rdy_count,
  :max_msg_timeout,
  :msg_timeout,
  :tls_v1,
  :deflate,
  :deflate_level,
  :max_deflate_level,
  :snappy,
  :sample_rate,
  :auth_required
]
EXCLUSIVE_FEATURES =

List of features that may not be enabled together

[[:snappy, :deflate]]
ENABLEABLE_FEATURES =

List of features that may be enabled by the client

[:tls_v1, :snappy, :deflate, :auth_required]

Instance Attribute Summary collapse

Attributes collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Utils::Lazy

included

Methods included from Utils::Logging

level=, #log

Constructor Details

#initialize(args = {}) ⇒ Connection

Create new instance

Parameters:

  • args (Hash) (defaults to: {})

Options Hash (args):

  • :host (String) — default: required

    server host

  • :port (String, Numeric) — default: required

    server port

  • :version (String)
  • :queue (Queue)

    received message queue

  • :callbacks (Hash)
  • :responses (Queue)

    received responses queue

  • :notifier (Celluloid::Actor)

    actor to notify on new message

  • :features (Hash)

    features to enable

  • :response_wait (Numeric)

    time to wait for response

  • :response_interval (Numeric)

    sleep interval for wait loop

  • :error_wait (Numeric)

    time to wait for error response

  • :enforce_features (TrueClass, FalseClass)

    fail if features are unavailable

  • :feature_args (Hash)

    options for connection features



95
96
97
98
99
# File 'lib/krakow/connection.rb', line 95

def initialize(args={})
  super
  @endpoint_settings = {}
  @running = false
end

Instance Attribute Details

#endpoint_settingsHash (readonly)

Returns current configuration for endpoint.

Returns:

  • (Hash)

    current configuration for endpoint



48
49
50
# File 'lib/krakow/connection.rb', line 48

def endpoint_settings
  @endpoint_settings
end

#runningTrueClass, FalseClass (readonly)

Returns:

  • (TrueClass, FalseClass)


52
53
54
# File 'lib/krakow/connection.rb', line 52

def running
  @running
end

#socketKsocket (readonly)

Returns underlying socket like instance.

Returns:

  • (Ksocket)

    underlying socket like instance



50
51
52
# File 'lib/krakow/connection.rb', line 50

def socket
  @socket
end

Class Method Details

.identifier(host, port, topic, channel) ⇒ String

Generate identifier for connection

Parameters:

  • host (String)
  • port (String, Integer)
  • topic (String)
  • channel (String)

Returns:

  • (String)


15
16
17
# File 'lib/krakow/connection.rb', line 15

def self.identifier(host, port, topic, channel)
  [host, port, topic, channel].compact.join('__')
end

Instance Method Details

#auth_requiredTrueClass

Send authentication request for connection

Returns:

  • (TrueClass)


339
340
341
342
343
344
345
346
347
348
349
# File 'lib/krakow/connection.rb', line 339

def auth_required
  info 'Authentication required for this connection'
  if(feature_args[:auth])
    transmit(Command::Auth.new(:secret => feature_args[:auth]))
    response = receive
    true
  else
    error 'No authentication information provided for connection!'
    abort 'Authentication failure. No authentication secret provided'
  end
end

#callback_for(type, arg, connection) ⇒ Object

Execute callback for given type

Parameters:

  • type (Symbol)

    type of callback

  • arg (Object)

    argument for callback (can be multiple)

  • connection (Krakow::Connection)

    current connection

Returns:

  • (Object)

    result of callback



261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/krakow/connection.rb', line 261

def callback_for(type, *args)
  callback = callbacks[type]
  if(callback)
    debug "Processing connection callback for #{type.inspect} (#{callback.inspect})"
    if(callback[:actor].alive?)
      callback[:actor].send(callback[:method], *(args + [current_actor]))
    else
      error "Expected actor for callback processing is not alive! (type: `#{type.inspect}`)"
    end
  else
    debug "No connection callback defined for #{type.inspect}"
    args.size == 1 ? args.first : args
  end
end

#callbacksHash

Returns the callbacks attribute.

Returns:

  • (Hash)

    the callbacks attribute



67
# File 'lib/krakow/connection.rb', line 67

attribute :callbacks, Hash, :default => ->{ Hash.new }

#callbacks?TrueClass, FalseClass

Returns truthiness of the callbacks attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the callbacks attribute



67
# File 'lib/krakow/connection.rb', line 67

attribute :callbacks, Hash, :default => ->{ Hash.new }

#channelString

Returns the channel attribute.

Returns:

  • (String)

    the channel attribute



64
# File 'lib/krakow/connection.rb', line 64

attribute :channel, String

#channel?TrueClass, FalseClass

Returns truthiness of the channel attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the channel attribute



64
# File 'lib/krakow/connection.rb', line 64

attribute :channel, String

#connected?TrueClass, FalseClass

Returns underlying socket is connected.

Returns:

  • (TrueClass, FalseClass)

    underlying socket is connected



385
386
387
388
389
390
391
# File 'lib/krakow/connection.rb', line 385

def connected?
  begin
    !!(socket && socket.alive?)
  rescue Celluloid::DeadActorError
    false
  end
end

#connection_cleanupnil

Destructor method for cleanup

Returns:

  • (nil)


170
171
172
173
174
175
176
177
178
179
# File 'lib/krakow/connection.rb', line 170

def connection_cleanup
  debug 'Tearing down connection'
  @running = false
  if(connected?)
    socket.terminate
  end
  @socket = nil
  info 'Connection torn down'
  nil
end

#deflateTrueClass

Enable deflate feature on underlying socket

Returns:

  • (TrueClass)


365
366
367
368
369
370
371
# File 'lib/krakow/connection.rb', line 365

def deflate
  debug 'Loading support for deflate compression and converting connection'
  @socket = ConnectionFeatures::Deflate::Io.new(socket, features_args)
  response = receive
  info "Deflate connection conversion complete. Response: #{response.inspect}"
  true
end

#enforce_features[TrueClass,FalseClass]

Returns the enforce_features attribute.

Returns:

  • ([TrueClass,FalseClass])

    the enforce_features attribute



74
# File 'lib/krakow/connection.rb', line 74

attribute :enforce_features, [TrueClass,FalseClass], :default => true

#enforce_features?TrueClass, FalseClass

Returns truthiness of the enforce_features attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the enforce_features attribute



74
# File 'lib/krakow/connection.rb', line 74

attribute :enforce_features, [TrueClass,FalseClass], :default => true

#error_waitNumeric

Returns the error_wait attribute.

Returns:

  • (Numeric)

    the error_wait attribute



73
# File 'lib/krakow/connection.rb', line 73

attribute :error_wait, Numeric, :default => 0

#error_wait?TrueClass, FalseClass

Returns truthiness of the error_wait attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the error_wait attribute



73
# File 'lib/krakow/connection.rb', line 73

attribute :error_wait, Numeric, :default => 0

#featuresHash

Returns the features attribute.

Returns:

  • (Hash)

    the features attribute



70
# File 'lib/krakow/connection.rb', line 70

attribute :features, Hash, :default => ->{ Hash.new }

#features?TrueClass, FalseClass

Returns truthiness of the features attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the features attribute



70
# File 'lib/krakow/connection.rb', line 70

attribute :features, Hash, :default => ->{ Hash.new }

#features_argsHash

Returns the features_args attribute.

Returns:

  • (Hash)

    the features_args attribute



75
# File 'lib/krakow/connection.rb', line 75

attribute :features_args, Hash, :default => ->{ Hash.new }

#features_args?TrueClass, FalseClass

Returns truthiness of the features_args attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the features_args attribute



75
# File 'lib/krakow/connection.rb', line 75

attribute :features_args, Hash, :default => ->{ Hash.new }

#handle(message) ⇒ Krakow::FrameType?

Handle non-message type Krakow::FrameType

Parameters:

Returns:



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/krakow/connection.rb', line 236

def handle(message)
  # Grab heartbeats upfront
  if(message.is_a?(FrameType::Response) && message.response == '_heartbeat_')
    debug 'Responding to heartbeat'
    transmit Command::Nop.new
    nil
  else
    message = callback_for(:handle, message)
    if(!message.is_a?(FrameType::Message))
      debug "Captured non-message type response: #{message}"
      responses << message
      nil
    else
      message
    end
  end
end

#hostString

Returns the host attribute.

Returns:

  • (String)

    the host attribute



61
# File 'lib/krakow/connection.rb', line 61

attribute :host, String, :required => true

#host?TrueClass, FalseClass

Returns truthiness of the host attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the host attribute



61
# File 'lib/krakow/connection.rb', line 61

attribute :host, String, :required => true

#identifierString

Returns identifier for this connection.

Returns:

  • (String)

    identifier for this connection



102
103
104
# File 'lib/krakow/connection.rb', line 102

def identifier
  self.class.identifier(host, port, topic, channel)
end

#identify_and_negotiateTrueClass

IDENTIFY with server and negotiate features

Returns:

  • (TrueClass)


307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/krakow/connection.rb', line 307

def identify_and_negotiate
  expected_features = identify_defaults.merge(features)
  ident = Command::Identify.new(
    expected_features
  )
  socket.put(ident.to_line)
  response = receive
  if(expected_features[:feature_negotiation])
    begin
      @endpoint_settings = MultiJson.load(response.content, :symbolize_keys => true)
      info "Connection settings: #{endpoint_settings.inspect}"
      # Enable things we need to enable
      ENABLEABLE_FEATURES.each do |key|
        if(endpoint_settings[key])
          send(key)
        elsif(enforce_features && expected_features[key])
          abort Error::ConnectionFeatureFailure.new("Failed to enable #{key} feature on connection!")
        end
      end
    rescue MultiJson::LoadError => e
      error "Failed to parse response from Identify request: #{e} - #{response}"
      abort e
    end
  else
    @endpoint_settings = {}
  end
  true
end

#identify_defaultsHash

Returns default settings for IDENTIFY.

Returns:

  • (Hash)

    default settings for IDENTIFY



292
293
294
295
296
297
298
299
300
301
302
# File 'lib/krakow/connection.rb', line 292

def identify_defaults
  unless(@identify_defaults)
    @identify_defaults = {
      :short_id => Socket.gethostname,
      :long_id => Socket.gethostbyname(Socket.gethostname).flatten.compact.first,
      :user_agent => "krakow/#{Krakow::VERSION}",
      :feature_negotiation => true
    }
  end
  @identify_defaults
end

#init!nil

Initialize the connection

Returns:

  • (nil)


114
115
116
117
118
# File 'lib/krakow/connection.rb', line 114

def init!
  connect!
  async.process_to_queue!
  nil
end

#notifier[Celluloid::Signals, Celluloid::Condition, Celluloid::Actor]

Returns the notifier attribute.

Returns:

  • ([Celluloid::Signals, Celluloid::Condition, Celluloid::Actor])

    the notifier attribute



69
# File 'lib/krakow/connection.rb', line 69

attribute :notifier, [Celluloid::Signals, Celluloid::Condition, Celluloid::Actor]

#notifier?TrueClass, FalseClass

Returns truthiness of the notifier attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the notifier attribute



69
# File 'lib/krakow/connection.rb', line 69

attribute :notifier, [Celluloid::Signals, Celluloid::Condition, Celluloid::Actor]

#port[String,Integer]

Returns the port attribute.

Returns:

  • ([String,Integer])

    the port attribute



62
# File 'lib/krakow/connection.rb', line 62

attribute :port, [String,Integer], :required => true

#port?TrueClass, FalseClass

Returns truthiness of the port attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the port attribute



62
# File 'lib/krakow/connection.rb', line 62

attribute :port, [String,Integer], :required => true

#process_to_queue!nil

Receive messages and place into queue

Returns:

  • (nil)


212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/krakow/connection.rb', line 212

def process_to_queue!
  unless(@running)
    @running = true
    while(@running)
      message = handle(receive)
      if(message)
        debug "Adding message to queue #{message}"
        queue << message
        if(notifier)
          warn "Sending new message notification: #{notifier} - #{message}"
          notifier.broadcast(message)
        end
      else
        debug 'Received `nil` message. Ignoring.'
      end
    end
  end
  nil
end

#queue[Queue, Consumer::Queue]

Returns the queue attribute.

Returns:



66
# File 'lib/krakow/connection.rb', line 66

attribute :queue, [Queue, Consumer::Queue], :default => ->{ Queue.new }

#queue?TrueClass, FalseClass

Returns truthiness of the queue attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the queue attribute



66
# File 'lib/krakow/connection.rb', line 66

attribute :queue, [Queue, Consumer::Queue], :default => ->{ Queue.new }

#receiveKrakow::FrameType?

Receive from server

Returns:

Raises:



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/krakow/connection.rb', line 185

def receive
  debug 'Read wait for frame start'
  buf = socket.get(8)
  if(buf)
    @receiving = true
    debug "<<< #{buf.inspect}"
    struct = FrameType.decode(buf)
    debug "Decoded structure: #{struct.inspect}"
    struct[:data] = socket.get(struct[:size])
    debug "<<< #{struct[:data].inspect}"
    @receiving = false
    frame = FrameType.build(struct)
    debug "Struct: #{struct.inspect} Frame: #{frame.inspect}"
    frame
  else
    nil
  end
end

#receiving?TrueClass, FalseClass

Returns is connection currently receiving a message.

Returns:

  • (TrueClass, FalseClass)

    is connection currently receiving a message



205
206
207
# File 'lib/krakow/connection.rb', line 205

def receiving?
  !!@receiving
end

#response_intervalNumeric

Returns the response_interval attribute.

Returns:

  • (Numeric)

    the response_interval attribute



72
# File 'lib/krakow/connection.rb', line 72

attribute :response_interval, Numeric, :default => 0.03

#response_interval?TrueClass, FalseClass

Returns truthiness of the response_interval attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the response_interval attribute



72
# File 'lib/krakow/connection.rb', line 72

attribute :response_interval, Numeric, :default => 0.03

#response_waitNumeric

Returns the response_wait attribute.

Returns:

  • (Numeric)

    the response_wait attribute



71
# File 'lib/krakow/connection.rb', line 71

attribute :response_wait, Numeric, :default => 1.0

#response_wait?TrueClass, FalseClass

Returns truthiness of the response_wait attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the response_wait attribute



71
# File 'lib/krakow/connection.rb', line 71

attribute :response_wait, Numeric, :default => 1.0

#responsesQueue

Returns the responses attribute.

Returns:

  • (Queue)

    the responses attribute



68
# File 'lib/krakow/connection.rb', line 68

attribute :responses, Queue, :default => ->{ Queue.new }

#responses?TrueClass, FalseClass

Returns truthiness of the responses attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the responses attribute



68
# File 'lib/krakow/connection.rb', line 68

attribute :responses, Queue, :default => ->{ Queue.new }

#snappyTrueClass

Enable snappy feature on underlying socket

Returns:

  • (TrueClass)


354
355
356
357
358
359
360
# File 'lib/krakow/connection.rb', line 354

def snappy
  info 'Loading support for snappy compression and converting connection'
  @socket = ConnectionFeatures::SnappyFrames::Io.new(socket, features_args)
  response = receive
  info "Snappy connection conversion complete. Response: #{response.inspect}"
  true
end

#tls_v1TrueClass

Enable TLS feature on underlying socket

Returns:

  • (TrueClass)


376
377
378
379
380
381
382
# File 'lib/krakow/connection.rb', line 376

def tls_v1
  info 'Enabling TLS for connection'
  @socket = ConnectionFeatures::Ssl::Io.new(socket, features_args)
  response = receive
  info "TLS enable complete. Response: #{response.inspect}"
  true
end

#to_sString

Returns stringify object.

Returns:

  • (String)

    stringify object



107
108
109
# File 'lib/krakow/connection.rb', line 107

def to_s
  "<#{self.class.name}:#{object_id} {#{host}:#{port}}>"
end

#topicString

Returns the topic attribute.

Returns:

  • (String)

    the topic attribute



63
# File 'lib/krakow/connection.rb', line 63

attribute :topic, String

#topic?TrueClass, FalseClass

Returns truthiness of the topic attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the topic attribute



63
# File 'lib/krakow/connection.rb', line 63

attribute :topic, String

#transmit(message) ⇒ TrueClass, Krakow::FrameType

Send message to remote server

Parameters:

  • message (Krakow::Message)

    message to send

Returns:



124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/krakow/connection.rb', line 124

def transmit(message)
  unless(message.respond_to?(:to_line))
    abort TypeError.new("Expecting type `Krakow::FrameType` but received `#{message.class}`")
  end
  output = message.to_line
  response_wait = wait_time_for(message)
  if(response_wait > 0)
    transmit_with_response(message, response_wait)
  else
    debug ">>> #{output}"
    socket.put(output)
    true
  end
end

#transmit_with_response(message, wait_time) ⇒ Krakow::FrameType

Sends message and waits for response

Parameters:

  • message (Krakow::Message)

    message to send

Returns:



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/krakow/connection.rb', line 143

def transmit_with_response(message, wait_time)
  responses.clear
  socket.put(message.to_line)
  response = nil
  (wait_time / response_interval).to_i.times do |i|
    response = responses.pop unless responses.empty?
    break if response
    sleep(response_interval)
  end
  if(response)
    message.response = response
    if(message.error?(response))
      res = Error::BadResponse.new "Message transmission failed #{message}"
      res.result = response
      abort res
    end
    response
  else
    unless(Command.response_for(message) == :error_only)
      abort Error::BadResponse::NoResponse.new "No response provided for message #{message}"
    end
  end
end

#versionString

Returns the version attribute.

Returns:

  • (String)

    the version attribute



65
# File 'lib/krakow/connection.rb', line 65

attribute :version, String, :default => 'v2'

#version?TrueClass, FalseClass

Returns truthiness of the version attribute.

Returns:

  • (TrueClass, FalseClass)

    truthiness of the version attribute



65
# File 'lib/krakow/connection.rb', line 65

attribute :version, String, :default => 'v2'

#wait_time_for(message) ⇒ Numeric

Returns configured wait time for given message type

Parameters:

Returns:

  • (Numeric)

    seconds to wait



280
281
282
283
284
285
286
287
288
289
# File 'lib/krakow/connection.rb', line 280

def wait_time_for(message)
  case Command.response_for(message)
  when :required
    response_wait
  when :error_only
    error_wait
  else
    0
  end
end