Class: Rex::Proto::Amqp::Version091::Client

Inherits:
Object
  • Object
show all
Includes:
Rex::Proto::Amqp, Rex::Proto::Amqp::Version091
Defined in:
lib/rex/proto/amqp/version_0_9_1/client.rb,
lib/rex/proto/amqp/version_0_9_1/client/channel.rb

Defined Under Namespace

Classes: Channel

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port: nil, context: {}, ssl: true, ssl_version: nil, comm: nil, timeout: 10) ⇒ Client

Returns a new instance of Client.

Parameters:

  • host (String)

    The AMQP server host.

  • port (Integer, NilClass) (defaults to: nil)

    The AMQP server port or nil for automatic based on ssl.

  • ssl (Boolean) (defaults to: true)

    Whether or not SSL is used for the connection.

  • ssl_version (String) (defaults to: nil)

    The SSL version to use.

  • comm (Rex::Socket::Comm) (defaults to: nil)

    An optional, explicit object to use for creating the connection.

  • timeout (Integer) (defaults to: 10)

    The communication timeout in seconds.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 35

def initialize(host, port: nil, context: {}, ssl: true, ssl_version: nil, comm: nil, timeout: 10)
  if port.nil?
    port = ssl ? 5671 : 5672
  end

  @host = host
  @port = port
  @context = context
  @ssl = ssl
  @ssl_version = ssl_version
  @comm = comm
  @server_info = {}
  @channels = {}
  @frame_queue = []
  @next_channel_id = 1
  @timeout = timeout
end

Instance Attribute Details

#commRex::Socket::Comm (readonly)

Returns An optional, explicit object to use for creating the connection.

Returns:

  • (Rex::Socket::Comm)

    An optional, explicit object to use for creating the connection.



20
21
22
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 20

def comm
  @comm
end

#hostString (readonly)

Returns The AMQP server host.

Returns:

  • (String)

    The AMQP server host.



11
12
13
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 11

def host
  @host
end

#portInteger (readonly)

Returns The AMQP server port.

Returns:

  • (Integer)

    The AMQP server port.



14
15
16
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 14

def port
  @port
end

#server_infoHash (readonly)

Returns A hash containing server information.

Returns:

  • (Hash)

    A hash containing server information.



23
24
25
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 23

def server_info
  @server_info
end

#sslBoolean (readonly)

Returns Whether or not SSL is used for the connection.

Returns:

  • (Boolean)

    Whether or not SSL is used for the connection.



17
18
19
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 17

def ssl
  @ssl
end

#timeoutInteger

Returns The communication timeout in seconds.

Returns:

  • (Integer)

    The communication timeout in seconds.



27
28
29
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 27

def timeout
  @timeout
end

Instance Method Details

#channel_close(channel) ⇒ NilClass

Close an established channel.

Parameters:

  • channel (Channel)

    The channel object to close.

Returns:

  • (NilClass)


188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 188

def channel_close(channel)
  ch_close = Version091::Frames::AmqpVersion091MethodFrame.new
  ch_close.header.frame_channel = channel.id
  ch_close.arguments = Version091::Frames::MethodArguments::AmqpVersion091ChannelClose.new
  send_frame(ch_close)
  resp = recv_frame

  unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ChannelCloseOk)
    raise Error::UnexpectedReplyError.new(resp)
  end

  @channels.delete(channel.id)
  nil
end

#channel_openChannel

Open a new channel.

Returns:

  • (Channel)

    The newly opened channel.



169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 169

def channel_open
  ch_open = Version091::Frames::AmqpVersion091MethodFrame.new
  ch_open.header.frame_channel = cid = @next_channel_id
  ch_open.arguments = Version091::Frames::MethodArguments::AmqpVersion091ChannelOpen.new
  send_frame(ch_open)
  resp = recv_frame

  unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ChannelOpenOk)
    raise Error::UnexpectedReplyError.new(resp)
  end

  @next_channel_id += 1
  @channels[cid] = Channel.new(self, cid)
end

#closeNilClass

Close the connection to the remote server.

Returns:

  • (NilClass)


76
77
78
79
80
81
82
83
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 76

def close
  if @conn && !@conn.closed?
    @conn.shutdown
    @conn.close
  end

  @conn = nil
end

#connect(t = -1)) ⇒ NilClass

Establish the connection to the remote server.

Parameters:

  • t (Integer) (defaults to: -1))

    An explicit timeout to use for the connection otherwise the default will be used.

Returns:

  • (NilClass)


57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 57

def connect(t = -1)
  timeout = (t.nil? or t == -1) ? @timeout : t

  @conn = Rex::Socket::Tcp.create(
    'PeerHost'   => @host,
    'PeerPort'   => @port.to_i,
    'Context'    => @context,
    'SSL'        => @ssl,
    'SSLVersion' => @ssl_version,
    'Timeout'    => timeout,
    'Comm'       => @comm
  )

  nil
end

#connection_closeNilClass

Close the established connection by performing the necessary handshake.

Returns:

  • (NilClass)


206
207
208
209
210
211
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 206

def connection_close
  send_connection_close
  recv_connection_close_ok

  nil
end

#connection_open(vhost) ⇒ NilClass

Open a connection by performing the necessary handshake.

Parameters:

  • vhost (String)

    The virtual host to connect to.

Returns:

  • (NilClass)


217
218
219
220
221
222
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 217

def connection_open(vhost)
  send_connection_open(virtual_host: vhost)
  recv_connection_open_ok

  nil
end

#connection_start(username, password) ⇒ NilClass

Start a connection by performing the necessary handshake. The caller needs to validate the response to ensure authentication succeeded.

Parameters:

  • username (String)

    The username to authenticate with.

  • password (String)

    The password to authenticate with.

Returns:

  • (NilClass)


230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 230

def connection_start(username, password)
  recv_connection_start

  unless @server_info[:security_mechanisms].include?('PLAIN')
    # PLAIN is supported by default, others can be added via plugins
    raise Error::NegotiationError.new('There are no mutually supported authentication mechanisms.')
  end

  # prefer en_US if it's available, otherwise select one at random
  if @server_info[:locales].include?('en_US')
    locale = 'en_US'
  else
    locale = @server_info[:locales].sample
  end

  send_connection_start_ok({
    # Per the spec, these properties "should" contain: product, version, platform, copyright, and information
    client_properties: [
      { name: 'capabilities', data: { data_type: 'F'.ord, data: [
        { name: 'authentication_failure_close', data: { data_type: 't'.ord, data: true } },
        { name: 'basic.nack', data: { data_type: 't'.ord, data: true } },
        { name: 'connection.blocked', data: { data_type: 't'.ord, data: true } },
        { name: 'consumer_cancel_notify', data: { data_type: 't'.ord, data: true } },
        { name: 'publisher_confirms', data: { data_type: 't'.ord, data: true } }
      ] } }
    ],
    # https://www.rabbitmq.com/access-control.html#mechanisms
    mechanism: 'PLAIN',
    response: build_sasl_response_plain(username, password),
    locale: locale
  })
end

#login(username, password, vhost: '/') ⇒ Boolean

Login to the remote server. The connection will be started automatically if it has not already been established.

Parameters:

  • username (String)

    The username to authenticate with.

  • password (String)

    The password to authenticate with.

  • vhost (String) (defaults to: '/')

    The virtual host to connect to.

Returns:

  • (Boolean)

    Whether or not authentication was successful.



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 91

def (username, password, vhost: '/')
  connect if @conn.nil?

  send_protocol_header
  connection_start(username, password)

  resp = recv_frame
  if is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionClose)
    close
    return false
  elsif !is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionTune)
    raise Error::UnexpectedReplyError.new(resp)
  end

  @server_info[:tuning] = resp.arguments.snapshot
  connection_tune_ok = Version091::Frames::AmqpVersion091MethodFrame.new
  connection_tune_ok.arguments = Version091::Frames::MethodArguments::AmqpVersion091ConnectionTuneOk.new(
    resp.arguments.snapshot
  )
  send_frame(connection_tune_ok)

  connection_open(vhost)

  true
end

#recv_connection_close_okObject



263
264
265
266
267
268
269
270
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 263

def recv_connection_close_ok
  resp = recv_frame
  unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionCloseOk)
    raise Error::UnexpectedReplyError.new(resp)
  end

  resp
end

#recv_connection_open_okObject



272
273
274
275
276
277
278
279
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 272

def recv_connection_open_ok
  resp = recv_frame
  unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionOpenOk)
    raise Error::UnexpectedReplyError.new(resp)
  end

  resp
end

#recv_connection_startObject



281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 281

def recv_connection_start
  resp = recv_frame
  unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionStart)
    raise Error::UnexpectedReplyError.new(resp)
  end

  @server_info = {
    properties: resp.arguments.server_properties.coerce,
    security_mechanisms: resp.arguments.mechanisms.split(' '),
    locales: resp.arguments.locales.split(' ')
  }

  resp
end

#recv_frameBinData::Record

Receive a frame from the connected peer with a timeout.

Returns:

  • (BinData::Record)

    The frame that was received.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 128

def recv_frame
  remaining = @timeout
  header_raw, elapsed_time = Rex::Stopwatch.elapsed_time do
    num_bytes = Version091::Frames::AmqpVersion091FrameHeader.new.num_bytes
    @conn.get_once(num_bytes, remaining)
  end
  remaining -= elapsed_time

  header = Version091::Frames::AmqpVersion091FrameHeader.read(header_raw)
  body = ''
  while (body.size < (header.frame_size + 1)) && remaining > 0
    chunk, elapsed_time = Rex::Stopwatch.elapsed_time do
      @conn.read((header.frame_size + 1) - body.size, remaining)
    end
    remaining -= elapsed_time
    body << chunk
  end

  unless body.size == (header.frame_size + 1)
    if remaining <= 0
      raise Rex::TimeoutError, 'Failed to read the response data due to timeout.'
    end

    Error::InvalidFrameError.new
  end

  case header.frame_type
  when 1
    frame = Version091::Frames::AmqpVersion091MethodFrame.read(header.to_binary_s + body)
  when 2
    frame = Version091::Frames::AmqpVersion091ContentHeaderFrame.read(header.to_binary_s + body)
  when 3
    frame = Version091::Frames::AmqpVersion091ContentBodyFrame.read(header.to_binary_s + body)
  end

  frame
end

#send_connection_close(arguments = {}) ⇒ Object



296
297
298
299
300
301
302
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 296

def send_connection_close(arguments={})
  conn_close = Version091::Frames::AmqpVersion091MethodFrame.new
  conn_close.arguments = Version091::Frames::MethodArguments::AmqpVersion091ConnectionClose.new(arguments)
  send_frame(conn_close)

  nil
end

#send_connection_open(arguments = {}) ⇒ Object



304
305
306
307
308
309
310
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 304

def send_connection_open(arguments={})
  connection_open = Version091::Frames::AmqpVersion091MethodFrame.new
  connection_open.arguments = Version091::Frames::MethodArguments::AmqpVersion091ConnectionOpen.new(arguments)
  send_frame(connection_open)

  nil
end

#send_connection_start_ok(arguments = {}) ⇒ Object



312
313
314
315
316
317
318
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 312

def send_connection_start_ok(arguments={})
  connection_start_ok = Version091::Frames::AmqpVersion091MethodFrame.new
  connection_start_ok.arguments = Version091::Frames::MethodArguments::AmqpVersion091ConnectionStartOk.new(arguments)
  send_frame(connection_start_ok)

  nil
end

#send_frame(frame) ⇒ Integer

Send a frame to the connected peer.

Parameters:

  • frame (#to_binary_s)

    The frame to send.

Returns:

  • (Integer)

    The number of bytes written.



121
122
123
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 121

def send_frame(frame)
  @conn.put(frame.to_binary_s)
end

#send_protocol_headerObject



320
321
322
323
324
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 320

def send_protocol_header
  send_frame(Version091::Frames::AmqpVersion091ProtocolHeader.new)

  nil
end