Class: GorgonBunny::Transport
- Inherits:
-
Object
- Object
- GorgonBunny::Transport
- Defined in:
- lib/gorgon_bunny/lib/gorgon_bunny/transport.rb
Constant Summary collapse
- DEFAULT_CONNECTION_TIMEOUT =
Default TCP connection timeout
5.0
- DEFAULT_TLS_PROTOCOL =
Default TLS protocol version to use. Currently SSLv3, same as in RabbitMQ Java client
"SSLv3"
Instance Attribute Summary collapse
-
#connect_timeout ⇒ Object
readonly
Returns the value of attribute connect_timeout.
-
#disconnect_timeout ⇒ Object
readonly
Returns the value of attribute disconnect_timeout.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#read_write_timeout ⇒ Object
readonly
Returns the value of attribute read_write_timeout.
-
#session ⇒ Object
readonly
Returns the value of attribute session.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#tls_context ⇒ Object
readonly
Returns the value of attribute tls_context.
Class Method Summary collapse
Instance Method Summary collapse
- #close(reason = nil) ⇒ Object
- #closed? ⇒ Boolean
- #configure_socket(&block) ⇒ Object
- #configure_tls_context(&block) ⇒ Object
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #flush ⇒ Object
- #hostname ⇒ Object
-
#initialize(session, host, port, opts) ⇒ Transport
constructor
A new instance of Transport.
- #initialize_socket ⇒ Object
- #maybe_initialize_socket ⇒ Object
- #open? ⇒ Boolean
- #post_initialize_socket ⇒ Object
- #read_fully(*args) ⇒ Object
-
#read_next_frame(opts = {}) ⇒ Object
Exposed primarily for GorgonBunny::Channel.
- #read_ready?(timeout = nil) ⇒ Boolean
-
#send_frame(frame) ⇒ Object
Sends frame to the peer.
-
#send_frame_without_timeout(frame) ⇒ Object
Sends frame to the peer without timeout control.
- #uses_ssl? ⇒ Boolean (also: #ssl?)
- #uses_tls? ⇒ Boolean (also: #tls?)
-
#write(data) ⇒ Object
Writes data to the socket.
-
#write_without_timeout(data) ⇒ Object
Writes data to the socket without timeout checks.
Constructor Details
#initialize(session, host, port, opts) ⇒ Transport
Returns a new instance of Transport.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 32 def initialize(session, host, port, opts) @session = session @session_thread = opts[:session_thread] @host = host @port = port @opts = opts @logger = session.logger @tls_enabled = tls_enabled?(opts) @read_write_timeout = opts[:socket_timeout] || 3 @read_write_timeout = nil if @read_write_timeout == 0 @connect_timeout = self.timeout_from(opts) @connect_timeout = nil if @connect_timeout == 0 @disconnect_timeout = @read_write_timeout || @connect_timeout @writes_mutex = @session.mutex_impl.new maybe_initialize_socket prepare_tls_context(opts) if @tls_enabled end |
Instance Attribute Details
#connect_timeout ⇒ Object (readonly)
Returns the value of attribute connect_timeout.
29 30 31 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 29 def connect_timeout @connect_timeout end |
#disconnect_timeout ⇒ Object (readonly)
Returns the value of attribute disconnect_timeout.
29 30 31 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 29 def disconnect_timeout @disconnect_timeout end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
29 30 31 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 29 def host @host end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
29 30 31 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 29 def port @port end |
#read_write_timeout ⇒ Object (readonly)
Returns the value of attribute read_write_timeout.
29 30 31 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 29 def read_write_timeout @read_write_timeout end |
#session ⇒ Object (readonly)
Returns the value of attribute session.
29 30 31 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 29 def session @session end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
29 30 31 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 29 def socket @socket end |
#tls_context ⇒ Object (readonly)
Returns the value of attribute tls_context.
30 31 32 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 30 def tls_context @tls_context end |
Class Method Details
.ping!(host, port, timeout) ⇒ Object
233 234 235 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 233 def self.ping!(host, port, timeout) raise ConnectionTimeout.new("#{host}:#{port} is unreachable") if !reacheable?(host, port, timeout) end |
.reacheable?(host, port, timeout) ⇒ Boolean
220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 220 def self.reacheable?(host, port, timeout) begin s = GorgonBunny::Socket.open(host, port, :socket_timeout => timeout) true rescue SocketError, Timeout::Error => e false ensure s.close if s end end |
Instance Method Details
#close(reason = nil) ⇒ Object
167 168 169 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 167 def close(reason = nil) @socket.close if open? end |
#closed? ⇒ Boolean
175 176 177 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 175 def closed? !open? end |
#configure_socket(&block) ⇒ Object
86 87 88 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 86 def configure_socket(&block) block.call(@socket) if @socket end |
#configure_tls_context(&block) ⇒ Object
90 91 92 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 90 def configure_tls_context(&block) block.call(@tls_context) if @tls_context end |
#connect ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 69 def connect if uses_ssl? @socket.connect @socket.post_connection_check(host) if uses_tls? && @verify_peer @status = :connected @socket else # no-op end end |
#connected? ⇒ Boolean
82 83 84 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 82 def connected? :not_connected == @status && open? end |
#flush ⇒ Object
179 180 181 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 179 def flush @socket.flush if @socket end |
#hostname ⇒ Object
54 55 56 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 54 def hostname @host end |
#initialize_socket ⇒ Object
237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 237 def initialize_socket begin @socket = GorgonBunny::Timeout.timeout(@connect_timeout, ClientTimeout) do GorgonBunny::Socket.open(@host, @port, :keepalive => @opts[:keepalive], :socket_timeout => @connect_timeout) end rescue StandardError, ClientTimeout => e @status = :not_connected raise GorgonBunny::TCPConnectionFailed.new(e, self.hostname, self.port) end @socket end |
#maybe_initialize_socket ⇒ Object
252 253 254 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 252 def maybe_initialize_socket initialize_socket if !@socket || closed? end |
#open? ⇒ Boolean
171 172 173 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 171 def open? @socket && !@socket.closed? end |
#post_initialize_socket ⇒ Object
256 257 258 259 260 261 262 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 256 def post_initialize_socket @socket = if uses_tls? wrap_in_tls_socket(@socket) else @socket end end |
#read_fully(*args) ⇒ Object
183 184 185 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 183 def read_fully(*args) @socket.read_fully(*args) end |
#read_next_frame(opts = {}) ⇒ Object
Exposed primarily for GorgonBunny::Channel
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 195 def read_next_frame(opts = {}) header = @socket.read_fully(7) # TODO: network issues here will sometimes cause # the socket method return an empty string. We need to log # and handle this better. # type, channel, size = begin # GorgonAMQ::Protocol::Frame.decode_header(header) # rescue GorgonAMQ::Protocol::EmptyResponseError => e # puts "Got GorgonAMQ::Protocol::EmptyResponseError, header is #{header.inspect}" # end type, channel, size = GorgonAMQ::Protocol::Frame.decode_header(header) payload = @socket.read_fully(size) frame_end = @socket.read_fully(1) # 1) the size is miscalculated if payload.bytesize != size raise BadLengthError.new(size, payload.bytesize) end # 2) the size is OK, but the string doesn't end with FINAL_OCTET raise NoFinalOctetError.new if frame_end != GorgonAMQ::Protocol::Frame::FINAL_OCTET GorgonAMQ::Protocol::Frame.new(type, payload, channel) end |
#read_ready?(timeout = nil) ⇒ Boolean
187 188 189 190 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 187 def read_ready?(timeout = nil) io = IO.select([@socket].compact, nil, nil, timeout) io && io[0].include?(@socket) end |
#send_frame(frame) ⇒ Object
Sends frame to the peer.
146 147 148 149 150 151 152 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 146 def send_frame(frame) if closed? @session.handle_network_failure(ConnectionClosedError.new(frame)) else write(frame.encode) end end |
#send_frame_without_timeout(frame) ⇒ Object
Sends frame to the peer without timeout control.
158 159 160 161 162 163 164 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 158 def send_frame_without_timeout(frame) if closed? @session.handle_network_failure(ConnectionClosedError.new(frame)) else write_without_timeout(frame.encode) end end |
#uses_ssl? ⇒ Boolean Also known as: ssl?
63 64 65 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 63 def uses_ssl? @tls_enabled end |
#uses_tls? ⇒ Boolean Also known as: tls?
58 59 60 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 58 def uses_tls? @tls_enabled end |
#write(data) ⇒ Object
Writes data to the socket. If read/write timeout was specified, GorgonBunny::ClientTimeout will be raised if the operation times out.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 98 def write(data) begin if @read_write_timeout GorgonBunny::Timeout.timeout(@read_write_timeout, GorgonBunny::ClientTimeout) do if open? @writes_mutex.synchronize { @socket.write(data) } @socket.flush end end else if open? @writes_mutex.synchronize { @socket.write(data) } @socket.flush end end rescue SystemCallError, GorgonBunny::ClientTimeout, GorgonBunny::ConnectionError, IOError => e @logger.error "Got an exception when sending data: #{e.} (#{e.class.name})" close @status = :not_connected if @session.automatically_recover? @session.handle_network_failure(e) else @session_thread.raise(GorgonBunny::NetworkFailure.new("detected a network failure: #{e.}", e)) end end end |
#write_without_timeout(data) ⇒ Object
Writes data to the socket without timeout checks
127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 127 def write_without_timeout(data) begin @writes_mutex.synchronize { @socket.write(data) } @socket.flush rescue SystemCallError, GorgonBunny::ConnectionError, IOError => e close if @session.automatically_recover? @session.handle_network_failure(e) else @session_thread.raise(GorgonBunny::NetworkFailure.new("detected a network failure: #{e.}", e)) end end end |