Class: Bunny::Transport
- Inherits:
-
Object
- Object
- Bunny::Transport
- Defined in:
- lib/bunny/transport.rb
Constant Summary collapse
- DEFAULT_CONNECTION_TIMEOUT =
Default TCP connection timeout
30.0
- DEFAULT_READ_TIMEOUT =
30.0
- DEFAULT_WRITE_TIMEOUT =
30.0
- TLS_VERSION_ALIASES =
mimics METHODS_MAP in ssl.rb but also lists TLS 1.3 and string constants
{ TLSv1: OpenSSL::SSL::TLS1_VERSION, TLSv1_1: OpenSSL::SSL::TLS1_1_VERSION, TLSv1_2: OpenSSL::SSL::TLS1_2_VERSION, "1.0": OpenSSL::SSL::TLS1_VERSION, "1.1": OpenSSL::SSL::TLS1_1_VERSION, "1.2": OpenSSL::SSL::TLS1_2_VERSION, OpenSSL::SSL::TLS1_VERSION => OpenSSL::SSL::TLS1_VERSION, OpenSSL::SSL::TLS1_1_VERSION => OpenSSL::SSL::TLS1_1_VERSION, OpenSSL::SSL::TLS1_2_VERSION => OpenSSL::SSL::TLS1_2_VERSION }
Instance Attribute Summary collapse
-
#connect_timeout ⇒ Object
readonly
Returns the value of attribute connect_timeout.
-
#disconnect_timeout ⇒ Object
readonly
Returns the value of attribute disconnect_timeout.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#read_timeout ⇒ Object
Returns the value of attribute read_timeout.
-
#session ⇒ Object
readonly
Returns the value of attribute session.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#tls_ca_certificates ⇒ Object
readonly
Returns the value of attribute tls_ca_certificates.
-
#tls_certificate_path ⇒ Object
readonly
Returns the value of attribute tls_certificate_path.
-
#tls_context ⇒ Object
readonly
Returns the value of attribute tls_context.
-
#tls_key_path ⇒ Object
readonly
Returns the value of attribute tls_key_path.
-
#verify_peer ⇒ Object
readonly
Returns the value of attribute verify_peer.
-
#write_timeout ⇒ Object
readonly
Returns the value of attribute write_timeout.
Class Method Summary collapse
Instance Method Summary collapse
- #close(reason = nil) ⇒ Object
- #closed? ⇒ Boolean
- #configure_socket(&block) ⇒ Object
- #configure_tls_context(&block) ⇒ Object
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #flush ⇒ Object
- #hostname ⇒ Object
-
#initialize(session, host, port, opts) ⇒ Transport
constructor
A new instance of Transport.
- #initialize_socket ⇒ Object
- #local_address ⇒ Object
- #maybe_initialize_socket ⇒ Object
- #open? ⇒ Boolean
- #post_initialize_socket ⇒ Object
- #read_fully(count) ⇒ Object
-
#read_next_frame(opts = {}) ⇒ Object
Exposed primarily for Bunny::Channel.
- #read_ready?(timeout = nil) ⇒ Boolean
-
#send_frame(frame) ⇒ Object
Sends frame to the peer.
-
#send_frame_without_timeout(frame) ⇒ Object
Sends frame to the peer without timeout control.
- #uses_ssl? ⇒ Boolean (also: #ssl?)
- #uses_tls? ⇒ Boolean (also: #tls?)
-
#write(data) ⇒ Object
Writes data to the socket.
-
#write_without_timeout(data, raise_exceptions = false) ⇒ Object
Writes data to the socket without timeout checks.
Constructor Details
#initialize(session, host, port, opts) ⇒ Transport
Returns a new instance of Transport.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/bunny/transport.rb', line 60 def initialize(session, host, port, opts) @session = session @session_error_handler = opts[:session_error_handler] @host = host @port = port @opts = opts @logger = session.logger @tls_enabled = tls_enabled?(opts) @read_timeout = opts[:read_timeout] || DEFAULT_READ_TIMEOUT @read_timeout = nil if @read_timeout == 0 @write_timeout = opts[:socket_timeout] # Backwards compatability @write_timeout ||= opts[:write_timeout] || DEFAULT_WRITE_TIMEOUT @write_timeout = nil if @write_timeout == 0 @connect_timeout = self.timeout_from(opts) @connect_timeout = nil if @connect_timeout == 0 @disconnect_timeout = @write_timeout || @read_timeout || @connect_timeout @writes_mutex = @session.mutex_impl.new @socket = nil prepare_tls_context(opts) if @tls_enabled end |
Instance Attribute Details
#connect_timeout ⇒ Object (readonly)
Returns the value of attribute connect_timeout.
52 53 54 |
# File 'lib/bunny/transport.rb', line 52 def connect_timeout @connect_timeout end |
#disconnect_timeout ⇒ Object (readonly)
Returns the value of attribute disconnect_timeout.
52 53 54 |
# File 'lib/bunny/transport.rb', line 52 def disconnect_timeout @disconnect_timeout end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
52 53 54 |
# File 'lib/bunny/transport.rb', line 52 def host @host end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
52 53 54 |
# File 'lib/bunny/transport.rb', line 52 def port @port end |
#read_timeout ⇒ Object
Returns the value of attribute read_timeout.
52 53 54 |
# File 'lib/bunny/transport.rb', line 52 def read_timeout @read_timeout end |
#session ⇒ Object (readonly)
Returns the value of attribute session.
52 53 54 |
# File 'lib/bunny/transport.rb', line 52 def session @session end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
52 53 54 |
# File 'lib/bunny/transport.rb', line 52 def socket @socket end |
#tls_ca_certificates ⇒ Object (readonly)
Returns the value of attribute tls_ca_certificates.
53 54 55 |
# File 'lib/bunny/transport.rb', line 53 def tls_ca_certificates @tls_ca_certificates end |
#tls_certificate_path ⇒ Object (readonly)
Returns the value of attribute tls_certificate_path.
53 54 55 |
# File 'lib/bunny/transport.rb', line 53 def tls_certificate_path @tls_certificate_path end |
#tls_context ⇒ Object (readonly)
Returns the value of attribute tls_context.
53 54 55 |
# File 'lib/bunny/transport.rb', line 53 def tls_context @tls_context end |
#tls_key_path ⇒ Object (readonly)
Returns the value of attribute tls_key_path.
53 54 55 |
# File 'lib/bunny/transport.rb', line 53 def tls_key_path @tls_key_path end |
#verify_peer ⇒ Object (readonly)
Returns the value of attribute verify_peer.
53 54 55 |
# File 'lib/bunny/transport.rb', line 53 def verify_peer @verify_peer end |
#write_timeout ⇒ Object (readonly)
Returns the value of attribute write_timeout.
52 53 54 |
# File 'lib/bunny/transport.rb', line 52 def write_timeout @write_timeout end |
Class Method Details
.ping!(host, port, timeout) ⇒ Object
318 319 320 |
# File 'lib/bunny/transport.rb', line 318 def self.ping!(host, port, timeout) raise ConnectionTimeout.new("#{host}:#{port} is unreachable") if !reacheable?(host, port, timeout) end |
.reacheable?(host, port, timeout) ⇒ Boolean
305 306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/bunny/transport.rb', line 305 def self.reacheable?(host, port, timeout) begin s = Bunny::SocketImpl.open(host, port, :connect_timeout => timeout) true rescue SocketError, Timeout::Error => _e false ensure s.close if s end end |
Instance Method Details
#close(reason = nil) ⇒ Object
245 246 247 |
# File 'lib/bunny/transport.rb', line 245 def close(reason = nil) @socket.close if open? end |
#closed? ⇒ Boolean
253 254 255 |
# File 'lib/bunny/transport.rb', line 253 def closed? !open? end |
#configure_socket(&block) ⇒ Object
146 147 148 |
# File 'lib/bunny/transport.rb', line 146 def configure_socket(&block) block.call(@socket) if @socket end |
#configure_tls_context(&block) ⇒ Object
150 151 152 |
# File 'lib/bunny/transport.rb', line 150 def configure_tls_context(&block) block.call(@tls_context) if @tls_context end |
#connect ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/bunny/transport.rb', line 108 def connect if uses_tls? begin @socket.connect rescue OpenSSL::SSL::SSLError => e @logger.error { "TLS connection failed: #{e.}" } raise e end log_peer_certificate_info(Logger::DEBUG, @socket.peer_cert) log_peer_certificate_chain_info(Logger::DEBUG, @socket.peer_cert_chain) begin @socket.post_connection_check(host) if @verify_peer rescue OpenSSL::SSL::SSLError => e @logger.error do msg = "Peer verification of target server failed: #{e.}. " msg += "Target hostname: #{hostname}, see peer certificate chain details below." msg end log_peer_certificate_info(Logger::ERROR, @socket.peer_cert) log_peer_certificate_chain_info(Logger::ERROR, @socket.peer_cert_chain) raise e end @status = :connected @socket else # no-op end end |
#connected? ⇒ Boolean
142 143 144 |
# File 'lib/bunny/transport.rb', line 142 def connected? :connected == @status && open? end |
#flush ⇒ Object
257 258 259 |
# File 'lib/bunny/transport.rb', line 257 def flush @socket.flush if @socket end |
#hostname ⇒ Object
89 90 91 |
# File 'lib/bunny/transport.rb', line 89 def hostname @host end |
#initialize_socket ⇒ Object
322 323 324 325 326 327 328 329 330 331 332 333 |
# File 'lib/bunny/transport.rb', line 322 def initialize_socket begin @socket = Bunny::SocketImpl.open(@host, @port, :keepalive => @opts[:keepalive], :connect_timeout => @connect_timeout) rescue StandardError, ClientTimeout => e @status = :not_connected raise Bunny::TCPConnectionFailed.new(e, self.hostname, self.port) end @socket end |
#local_address ⇒ Object
93 94 95 |
# File 'lib/bunny/transport.rb', line 93 def local_address @socket.local_address end |
#maybe_initialize_socket ⇒ Object
335 336 337 |
# File 'lib/bunny/transport.rb', line 335 def maybe_initialize_socket initialize_socket if !@socket || closed? end |
#open? ⇒ Boolean
249 250 251 |
# File 'lib/bunny/transport.rb', line 249 def open? @socket && !@socket.closed? end |
#post_initialize_socket ⇒ Object
339 340 341 342 343 344 345 |
# File 'lib/bunny/transport.rb', line 339 def post_initialize_socket @socket = if uses_tls? and !@socket.is_a?(Bunny::SSLSocketImpl) wrap_in_tls_socket(@socket) else @socket end end |
#read_fully(count) ⇒ Object
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/bunny/transport.rb', line 261 def read_fully(count) begin @socket.read_fully(count, @read_timeout) rescue SystemCallError, Timeout::Error, Bunny::ConnectionError, IOError => e @logger.error "Got an exception when receiving data: #{e.} (#{e.class.name})" close @status = :not_connected if @session.automatically_recover? raise else @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.}", e)) end end end |
#read_next_frame(opts = {}) ⇒ Object
Exposed primarily for Bunny::Channel
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/bunny/transport.rb', line 284 def read_next_frame(opts = {}) header = read_fully(7) type, channel, size = AMQ::Protocol::Frame.decode_header(header) payload = if size > 0 read_fully(size) else '' end frame_end = read_fully(1) # 1) the size is miscalculated if payload.bytesize != size raise BadLengthError.new(size, payload.bytesize) end # 2) the size is OK, but the string doesn't end with FINAL_OCTET raise NoFinalOctetError.new if frame_end != AMQ::Protocol::Frame::FINAL_OCTET AMQ::Protocol::Frame.new(type, payload, channel) end |
#read_ready?(timeout = nil) ⇒ Boolean
277 278 279 280 |
# File 'lib/bunny/transport.rb', line 277 def read_ready?(timeout = nil) io = IO.select([@socket].compact, nil, nil, timeout) io && io[0].include?(@socket) end |
#send_frame(frame) ⇒ Object
Sends frame to the peer.
224 225 226 227 228 229 230 |
# File 'lib/bunny/transport.rb', line 224 def send_frame(frame) if closed? @session.handle_network_failure(ConnectionClosedError.new(frame)) else write(frame.encode) end end |
#send_frame_without_timeout(frame) ⇒ Object
Sends frame to the peer without timeout control.
236 237 238 239 240 241 242 |
# File 'lib/bunny/transport.rb', line 236 def send_frame_without_timeout(frame) if closed? @session.handle_network_failure(ConnectionClosedError.new(frame)) else write_without_timeout(frame.encode) end end |
#uses_ssl? ⇒ Boolean Also known as: ssl?
102 103 104 |
# File 'lib/bunny/transport.rb', line 102 def uses_ssl? @tls_enabled end |
#uses_tls? ⇒ Boolean Also known as: tls?
97 98 99 |
# File 'lib/bunny/transport.rb', line 97 def uses_tls? @tls_enabled end |
#write(data) ⇒ Object
Writes data to the socket. If read/write timeout was specified the operation will return after that amount of time has elapsed waiting for the socket.
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/bunny/transport.rb', line 156 def write(data) return write_without_timeout(data) unless @write_timeout begin if open? @writes_mutex.synchronize do @socket.write(data) end end rescue SystemCallError, Timeout::Error, Bunny::ConnectionError, IOError => e @logger.error "Got an exception when sending data: #{e.} (#{e.class.name})" close @status = :not_connected if @session.automatically_recover? @session.handle_network_failure(e) else @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.}", e)) end end end |
#write_without_timeout(data, raise_exceptions = false) ⇒ Object
Writes data to the socket without timeout checks
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/bunny/transport.rb', line 204 def write_without_timeout(data, raise_exceptions = false) begin @writes_mutex.synchronize { @socket.write(data) } @socket.flush rescue SystemCallError, Bunny::ConnectionError, IOError => e close raise e if raise_exceptions if @session.automatically_recover? @session.handle_network_failure(e) else @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.}", e)) end end end |