Class: Bunny::Transport

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/transport.rb

Constant Summary collapse

DEFAULT_CONNECTION_TIMEOUT =

Default TCP connection timeout

30.0
DEFAULT_READ_TIMEOUT =
30.0
DEFAULT_WRITE_TIMEOUT =
30.0
TLS_VERSION_ALIASES =

mimics METHODS_MAP in ssl.rb but also lists TLS 1.3 and string constants

{
  TLSv1: OpenSSL::SSL::TLS1_VERSION,
  TLSv1_1: OpenSSL::SSL::TLS1_1_VERSION,
  TLSv1_2: OpenSSL::SSL::TLS1_2_VERSION,
  "1.0": OpenSSL::SSL::TLS1_VERSION,
  "1.1": OpenSSL::SSL::TLS1_1_VERSION,
  "1.2": OpenSSL::SSL::TLS1_2_VERSION,
  OpenSSL::SSL::TLS1_VERSION => OpenSSL::SSL::TLS1_VERSION,
  OpenSSL::SSL::TLS1_1_VERSION => OpenSSL::SSL::TLS1_1_VERSION,
  OpenSSL::SSL::TLS1_2_VERSION => OpenSSL::SSL::TLS1_2_VERSION
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, host, port, opts) ⇒ Transport

Returns a new instance of Transport.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/bunny/transport.rb', line 60

def initialize(session, host, port, opts)
  @session        = session
  @session_error_handler = opts[:session_error_handler]
  @host    = host
  @port    = port
  @opts    = opts

  @logger                = session.logger
  @tls_enabled           = tls_enabled?(opts)

  @read_timeout = opts[:read_timeout] || DEFAULT_READ_TIMEOUT
  @read_timeout = nil if @read_timeout == 0

  @write_timeout = opts[:socket_timeout] # Backwards compatability

  @write_timeout ||= opts[:write_timeout] || DEFAULT_WRITE_TIMEOUT
  @write_timeout = nil if @write_timeout == 0

  @connect_timeout    = self.timeout_from(opts)
  @connect_timeout    = nil if @connect_timeout == 0
  @disconnect_timeout = @write_timeout || @read_timeout || @connect_timeout

  @writes_mutex       = @session.mutex_impl.new

  @socket = nil

  prepare_tls_context(opts) if @tls_enabled
end

Instance Attribute Details

#connect_timeoutObject (readonly)

Returns the value of attribute connect_timeout.



52
53
54
# File 'lib/bunny/transport.rb', line 52

def connect_timeout
  @connect_timeout
end

#disconnect_timeoutObject (readonly)

Returns the value of attribute disconnect_timeout.



52
53
54
# File 'lib/bunny/transport.rb', line 52

def disconnect_timeout
  @disconnect_timeout
end

#hostObject (readonly)

Returns the value of attribute host.



52
53
54
# File 'lib/bunny/transport.rb', line 52

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



52
53
54
# File 'lib/bunny/transport.rb', line 52

def port
  @port
end

#read_timeoutObject

Returns the value of attribute read_timeout.



52
53
54
# File 'lib/bunny/transport.rb', line 52

def read_timeout
  @read_timeout
end

#sessionObject (readonly)

Returns the value of attribute session.



52
53
54
# File 'lib/bunny/transport.rb', line 52

def session
  @session
end

#socketObject (readonly)

Returns the value of attribute socket.



52
53
54
# File 'lib/bunny/transport.rb', line 52

def socket
  @socket
end

#tls_ca_certificatesObject (readonly)

Returns the value of attribute tls_ca_certificates.



53
54
55
# File 'lib/bunny/transport.rb', line 53

def tls_ca_certificates
  @tls_ca_certificates
end

#tls_certificate_pathObject (readonly)

Returns the value of attribute tls_certificate_path.



53
54
55
# File 'lib/bunny/transport.rb', line 53

def tls_certificate_path
  @tls_certificate_path
end

#tls_contextObject (readonly)

Returns the value of attribute tls_context.



53
54
55
# File 'lib/bunny/transport.rb', line 53

def tls_context
  @tls_context
end

#tls_key_pathObject (readonly)

Returns the value of attribute tls_key_path.



53
54
55
# File 'lib/bunny/transport.rb', line 53

def tls_key_path
  @tls_key_path
end

#verify_peerObject (readonly)

Returns the value of attribute verify_peer.



53
54
55
# File 'lib/bunny/transport.rb', line 53

def verify_peer
  @verify_peer
end

#write_timeoutObject (readonly)

Returns the value of attribute write_timeout.



52
53
54
# File 'lib/bunny/transport.rb', line 52

def write_timeout
  @write_timeout
end

Class Method Details

.ping!(host, port, timeout) ⇒ Object

Raises:



318
319
320
# File 'lib/bunny/transport.rb', line 318

def self.ping!(host, port, timeout)
  raise ConnectionTimeout.new("#{host}:#{port} is unreachable") if !reacheable?(host, port, timeout)
end

.reacheable?(host, port, timeout) ⇒ Boolean

Returns:

  • (Boolean)


305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/bunny/transport.rb', line 305

def self.reacheable?(host, port, timeout)
  begin
    s = Bunny::SocketImpl.open(host, port,
      :connect_timeout => timeout)

    true
  rescue SocketError, Timeout::Error => _e
    false
  ensure
    s.close if s
  end
end

Instance Method Details

#close(reason = nil) ⇒ Object



245
246
247
# File 'lib/bunny/transport.rb', line 245

def close(reason = nil)
  @socket.close if open?
end

#closed?Boolean

Returns:

  • (Boolean)


253
254
255
# File 'lib/bunny/transport.rb', line 253

def closed?
  !open?
end

#configure_socket(&block) ⇒ Object



146
147
148
# File 'lib/bunny/transport.rb', line 146

def configure_socket(&block)
  block.call(@socket) if @socket
end

#configure_tls_context(&block) ⇒ Object



150
151
152
# File 'lib/bunny/transport.rb', line 150

def configure_tls_context(&block)
  block.call(@tls_context) if @tls_context
end

#connectObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/bunny/transport.rb', line 108

def connect
  if uses_tls?
    begin
      @socket.connect
    rescue OpenSSL::SSL::SSLError => e
      @logger.error { "TLS connection failed: #{e.message}" }
      raise e
    end

    log_peer_certificate_info(Logger::DEBUG, @socket.peer_cert)
    log_peer_certificate_chain_info(Logger::DEBUG, @socket.peer_cert_chain)

    begin
      @socket.post_connection_check(host) if @verify_peer
    rescue OpenSSL::SSL::SSLError => e
      @logger.error do
        msg = "Peer verification of target server failed: #{e.message}. "
        msg += "Target hostname: #{hostname}, see peer certificate chain details below."
        msg
      end
      log_peer_certificate_info(Logger::ERROR, @socket.peer_cert)
      log_peer_certificate_chain_info(Logger::ERROR, @socket.peer_cert_chain)

      raise e
    end

    @status = :connected

    @socket
  else
    # no-op
  end
end

#connected?Boolean

Returns:

  • (Boolean)


142
143
144
# File 'lib/bunny/transport.rb', line 142

def connected?
  :connected == @status && open?
end

#flushObject



257
258
259
# File 'lib/bunny/transport.rb', line 257

def flush
  @socket.flush if @socket
end

#hostnameObject



89
90
91
# File 'lib/bunny/transport.rb', line 89

def hostname
  @host
end

#initialize_socketObject



322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/bunny/transport.rb', line 322

def initialize_socket
  begin
    @socket = Bunny::SocketImpl.open(@host, @port,
      :keepalive      => @opts[:keepalive],
      :connect_timeout => @connect_timeout)
  rescue StandardError, ClientTimeout => e
    @status = :not_connected
    raise Bunny::TCPConnectionFailed.new(e, self.hostname, self.port)
  end

  @socket
end

#local_addressObject



93
94
95
# File 'lib/bunny/transport.rb', line 93

def local_address
  @socket.local_address
end

#maybe_initialize_socketObject



335
336
337
# File 'lib/bunny/transport.rb', line 335

def maybe_initialize_socket
  initialize_socket if !@socket || closed?
end

#open?Boolean

Returns:

  • (Boolean)


249
250
251
# File 'lib/bunny/transport.rb', line 249

def open?
  @socket && !@socket.closed?
end

#post_initialize_socketObject



339
340
341
342
343
344
345
# File 'lib/bunny/transport.rb', line 339

def post_initialize_socket
  @socket = if uses_tls? and !@socket.is_a?(Bunny::SSLSocketImpl)
              wrap_in_tls_socket(@socket)
            else
              @socket
            end
end

#read_fully(count) ⇒ Object



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/bunny/transport.rb', line 261

def read_fully(count)
  begin
    @socket.read_fully(count, @read_timeout)
  rescue SystemCallError, Timeout::Error, Bunny::ConnectionError, IOError => e
    @logger.error "Got an exception when receiving data: #{e.message} (#{e.class.name})"
    close
    @status = :not_connected

    if @session.automatically_recover?
      raise
    else
      @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
    end
  end
end

#read_next_frame(opts = {}) ⇒ Object

Exposed primarily for Bunny::Channel

Raises:



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/bunny/transport.rb', line 284

def read_next_frame(opts = {})
  header              = read_fully(7)
  type, channel, size = AMQ::Protocol::Frame.decode_header(header)
  payload             = if size > 0
                          read_fully(size)
                        else
                          ''
                        end
  frame_end = read_fully(1)

  # 1) the size is miscalculated
  if payload.bytesize != size
    raise BadLengthError.new(size, payload.bytesize)
  end

  # 2) the size is OK, but the string doesn't end with FINAL_OCTET
  raise NoFinalOctetError.new if frame_end != AMQ::Protocol::Frame::FINAL_OCTET
  AMQ::Protocol::Frame.new(type, payload, channel)
end

#read_ready?(timeout = nil) ⇒ Boolean

Returns:

  • (Boolean)


277
278
279
280
# File 'lib/bunny/transport.rb', line 277

def read_ready?(timeout = nil)
  io = IO.select([@socket].compact, nil, nil, timeout)
  io && io[0].include?(@socket)
end

#send_frame(frame) ⇒ Object

Sends frame to the peer.



224
225
226
227
228
229
230
# File 'lib/bunny/transport.rb', line 224

def send_frame(frame)
  if closed?
    @session.handle_network_failure(ConnectionClosedError.new(frame))
  else
    write(frame.encode)
  end
end

#send_frame_without_timeout(frame) ⇒ Object

Sends frame to the peer without timeout control.



236
237
238
239
240
241
242
# File 'lib/bunny/transport.rb', line 236

def send_frame_without_timeout(frame)
  if closed?
    @session.handle_network_failure(ConnectionClosedError.new(frame))
  else
    write_without_timeout(frame.encode)
  end
end

#uses_ssl?Boolean Also known as: ssl?

Returns:

  • (Boolean)


102
103
104
# File 'lib/bunny/transport.rb', line 102

def uses_ssl?
  @tls_enabled
end

#uses_tls?Boolean Also known as: tls?

Returns:

  • (Boolean)


97
98
99
# File 'lib/bunny/transport.rb', line 97

def uses_tls?
  @tls_enabled
end

#write(data) ⇒ Object

Writes data to the socket. If read/write timeout was specified the operation will return after that amount of time has elapsed waiting for the socket.



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/bunny/transport.rb', line 156

def write(data)
  return write_without_timeout(data) unless @write_timeout

  begin
    if open?
      @writes_mutex.synchronize do
        @socket.write(data)
      end
    end
  rescue SystemCallError, Timeout::Error, Bunny::ConnectionError, IOError => e
    @logger.error "Got an exception when sending data: #{e.message} (#{e.class.name})"
    close
    @status = :not_connected

    if @session.automatically_recover?
      @session.handle_network_failure(e)
    else
      @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
    end
  end
end

#write_without_timeout(data, raise_exceptions = false) ⇒ Object

Writes data to the socket without timeout checks



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/bunny/transport.rb', line 204

def write_without_timeout(data, raise_exceptions = false)
  begin
    @writes_mutex.synchronize { @socket.write(data) }
    @socket.flush
  rescue SystemCallError, Bunny::ConnectionError, IOError => e
    close
    raise e if raise_exceptions

    if @session.automatically_recover?
      @session.handle_network_failure(e)
    else
      @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
    end
  end
end