Class: GorgonBunny::Transport

Inherits:
Object
  • Object
show all
Defined in:
lib/gorgon_bunny/lib/gorgon_bunny/transport.rb

Constant Summary collapse

DEFAULT_CONNECTION_TIMEOUT =

Default TCP connection timeout

5.0
DEFAULT_TLS_PROTOCOL =

Default TLS protocol version to use. Currently SSLv3, same as in RabbitMQ Java client

"SSLv3"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session, host, port, opts) ⇒ Transport

Returns a new instance of Transport.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 32

def initialize(session, host, port, opts)
  @session        = session
  @session_thread = opts[:session_thread]
  @host    = host
  @port    = port
  @opts    = opts

  @logger                = session.logger
  @tls_enabled           = tls_enabled?(opts)

  @read_write_timeout = opts[:socket_timeout] || 3
  @read_write_timeout = nil if @read_write_timeout == 0
  @connect_timeout    = self.timeout_from(opts)
  @connect_timeout    = nil if @connect_timeout == 0
  @disconnect_timeout = @read_write_timeout || @connect_timeout

  @writes_mutex       = @session.mutex_impl.new

  maybe_initialize_socket
  prepare_tls_context(opts) if @tls_enabled
end

Instance Attribute Details

#connect_timeoutObject (readonly)

Returns the value of attribute connect_timeout.



29
30
31
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 29

def connect_timeout
  @connect_timeout
end

#disconnect_timeoutObject (readonly)

Returns the value of attribute disconnect_timeout.



29
30
31
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 29

def disconnect_timeout
  @disconnect_timeout
end

#hostObject (readonly)

Returns the value of attribute host.



29
30
31
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 29

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



29
30
31
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 29

def port
  @port
end

#read_write_timeoutObject (readonly)

Returns the value of attribute read_write_timeout.



29
30
31
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 29

def read_write_timeout
  @read_write_timeout
end

#sessionObject (readonly)

Returns the value of attribute session.



29
30
31
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 29

def session
  @session
end

#socketObject (readonly)

Returns the value of attribute socket.



29
30
31
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 29

def socket
  @socket
end

#tls_contextObject (readonly)

Returns the value of attribute tls_context.



30
31
32
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 30

def tls_context
  @tls_context
end

Class Method Details

.ping!(host, port, timeout) ⇒ Object

Raises:



233
234
235
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 233

def self.ping!(host, port, timeout)
  raise ConnectionTimeout.new("#{host}:#{port} is unreachable") if !reacheable?(host, port, timeout)
end

.reacheable?(host, port, timeout) ⇒ Boolean

Returns:

  • (Boolean)


220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 220

def self.reacheable?(host, port, timeout)
  begin
    s = GorgonBunny::Socket.open(host, port,
      :socket_timeout => timeout)

    true
  rescue SocketError, Timeout::Error => e
    false
  ensure
    s.close if s
  end
end

Instance Method Details

#close(reason = nil) ⇒ Object



167
168
169
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 167

def close(reason = nil)
  @socket.close if open?
end

#closed?Boolean

Returns:

  • (Boolean)


175
176
177
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 175

def closed?
  !open?
end

#configure_socket(&block) ⇒ Object



86
87
88
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 86

def configure_socket(&block)
  block.call(@socket) if @socket
end

#configure_tls_context(&block) ⇒ Object



90
91
92
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 90

def configure_tls_context(&block)
  block.call(@tls_context) if @tls_context
end

#connectObject



69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 69

def connect
  if uses_ssl?
    @socket.connect
    @socket.post_connection_check(host) if uses_tls? && @verify_peer

    @status = :connected

    @socket
  else
    # no-op
  end
end

#connected?Boolean

Returns:

  • (Boolean)


82
83
84
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 82

def connected?
  :not_connected == @status && open?
end

#flushObject



179
180
181
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 179

def flush
  @socket.flush if @socket
end

#hostnameObject



54
55
56
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 54

def hostname
  @host
end

#initialize_socketObject



237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 237

def initialize_socket
  begin
    @socket = GorgonBunny::Timeout.timeout(@connect_timeout, ClientTimeout) do
      GorgonBunny::Socket.open(@host, @port,
        :keepalive      => @opts[:keepalive],
        :socket_timeout => @connect_timeout)
    end
  rescue StandardError, ClientTimeout => e
    @status = :not_connected
    raise GorgonBunny::TCPConnectionFailed.new(e, self.hostname, self.port)
  end

  @socket
end

#maybe_initialize_socketObject



252
253
254
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 252

def maybe_initialize_socket
  initialize_socket if !@socket || closed?
end

#open?Boolean

Returns:

  • (Boolean)


171
172
173
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 171

def open?
  @socket && !@socket.closed?
end

#post_initialize_socketObject



256
257
258
259
260
261
262
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 256

def post_initialize_socket
  @socket = if uses_tls?
              wrap_in_tls_socket(@socket)
            else
              @socket
            end
end

#read_fully(*args) ⇒ Object



183
184
185
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 183

def read_fully(*args)
  @socket.read_fully(*args)
end

#read_next_frame(opts = {}) ⇒ Object

Exposed primarily for GorgonBunny::Channel

Raises:



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 195

def read_next_frame(opts = {})
  header    = @socket.read_fully(7)
  # TODO: network issues here will sometimes cause
  #       the socket method return an empty string. We need to log
  #       and handle this better.
  # type, channel, size = begin
  #                         GorgonAMQ::Protocol::Frame.decode_header(header)
  #                       rescue GorgonAMQ::Protocol::EmptyResponseError => e
  #                         puts "Got GorgonAMQ::Protocol::EmptyResponseError, header is #{header.inspect}"
  #                       end
  type, channel, size = GorgonAMQ::Protocol::Frame.decode_header(header)
  payload   = @socket.read_fully(size)
  frame_end = @socket.read_fully(1)

  # 1) the size is miscalculated
  if payload.bytesize != size
    raise BadLengthError.new(size, payload.bytesize)
  end

  # 2) the size is OK, but the string doesn't end with FINAL_OCTET
  raise NoFinalOctetError.new if frame_end != GorgonAMQ::Protocol::Frame::FINAL_OCTET
  GorgonAMQ::Protocol::Frame.new(type, payload, channel)
end

#read_ready?(timeout = nil) ⇒ Boolean

Returns:

  • (Boolean)


187
188
189
190
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 187

def read_ready?(timeout = nil)
  io = IO.select([@socket].compact, nil, nil, timeout)
  io && io[0].include?(@socket)
end

#send_frame(frame) ⇒ Object

Sends frame to the peer.



146
147
148
149
150
151
152
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 146

def send_frame(frame)
  if closed?
    @session.handle_network_failure(ConnectionClosedError.new(frame))
  else
    write(frame.encode)
  end
end

#send_frame_without_timeout(frame) ⇒ Object

Sends frame to the peer without timeout control.



158
159
160
161
162
163
164
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 158

def send_frame_without_timeout(frame)
  if closed?
    @session.handle_network_failure(ConnectionClosedError.new(frame))
  else
    write_without_timeout(frame.encode)
  end
end

#uses_ssl?Boolean Also known as: ssl?

Returns:

  • (Boolean)


63
64
65
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 63

def uses_ssl?
  @tls_enabled
end

#uses_tls?Boolean Also known as: tls?

Returns:

  • (Boolean)


58
59
60
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 58

def uses_tls?
  @tls_enabled
end

#write(data) ⇒ Object

Writes data to the socket. If read/write timeout was specified, GorgonBunny::ClientTimeout will be raised if the operation times out.

Raises:



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 98

def write(data)
  begin
    if @read_write_timeout
      GorgonBunny::Timeout.timeout(@read_write_timeout, GorgonBunny::ClientTimeout) do
        if open?
          @writes_mutex.synchronize { @socket.write(data) }
          @socket.flush
        end
      end
    else
      if open?
        @writes_mutex.synchronize { @socket.write(data) }
        @socket.flush
      end
    end
  rescue SystemCallError, GorgonBunny::ClientTimeout, GorgonBunny::ConnectionError, IOError => e
    @logger.error "Got an exception when sending data: #{e.message} (#{e.class.name})"
    close
    @status = :not_connected

    if @session.automatically_recover?
      @session.handle_network_failure(e)
    else
      @session_thread.raise(GorgonBunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
    end
  end
end

#write_without_timeout(data) ⇒ Object

Writes data to the socket without timeout checks



127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/gorgon_bunny/lib/gorgon_bunny/transport.rb', line 127

def write_without_timeout(data)
  begin
    @writes_mutex.synchronize { @socket.write(data) }
    @socket.flush
  rescue SystemCallError, GorgonBunny::ConnectionError, IOError => e
    close

    if @session.automatically_recover?
      @session.handle_network_failure(e)
    else
      @session_thread.raise(GorgonBunny::NetworkFailure.new("detected a network failure: #{e.message}", e))
    end
  end
end