Class: Rex::Proto::Amqp::Version091::Client
- Inherits:
-
Object
- Object
- Rex::Proto::Amqp::Version091::Client
- Includes:
- Rex::Proto::Amqp, Rex::Proto::Amqp::Version091
- Defined in:
- lib/rex/proto/amqp/version_0_9_1/client.rb,
lib/rex/proto/amqp/version_0_9_1/client/channel.rb
Defined Under Namespace
Classes: Channel
Instance Attribute Summary collapse
-
#comm ⇒ Rex::Socket::Comm
readonly
An optional, explicit object to use for creating the connection.
-
#host ⇒ String
readonly
The AMQP server host.
-
#port ⇒ Integer
readonly
The AMQP server port.
-
#server_info ⇒ Hash
readonly
A hash containing server information.
-
#ssl ⇒ Boolean
readonly
Whether or not SSL is used for the connection.
-
#timeout ⇒ Integer
The communication timeout in seconds.
Instance Method Summary collapse
-
#channel_close(channel) ⇒ NilClass
Close an established channel.
-
#channel_open ⇒ Channel
Open a new channel.
-
#close ⇒ NilClass
Close the connection to the remote server.
-
#connect(t = -1)) ⇒ NilClass
Establish the connection to the remote server.
-
#connection_close ⇒ NilClass
Close the established connection by performing the necessary handshake.
-
#connection_open(vhost) ⇒ NilClass
Open a connection by performing the necessary handshake.
-
#connection_start(username, password) ⇒ NilClass
Start a connection by performing the necessary handshake.
-
#initialize(host, port: nil, context: {}, ssl: true, ssl_version: nil, comm: nil, timeout: 10) ⇒ Client
constructor
A new instance of Client.
-
#login(username, password, vhost: '/') ⇒ Boolean
Login to the remote server.
- #recv_connection_close_ok ⇒ Object
- #recv_connection_open_ok ⇒ Object
- #recv_connection_start ⇒ Object
-
#recv_frame ⇒ BinData::Record
Receive a frame from the connected peer with a timeout.
- #send_connection_close(arguments = {}) ⇒ Object
- #send_connection_open(arguments = {}) ⇒ Object
- #send_connection_start_ok(arguments = {}) ⇒ Object
-
#send_frame(frame) ⇒ Integer
Send a frame to the connected peer.
- #send_protocol_header ⇒ Object
Constructor Details
#initialize(host, port: nil, context: {}, ssl: true, ssl_version: nil, comm: nil, timeout: 10) ⇒ Client
Returns a new instance of Client.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 35 def initialize(host, port: nil, context: {}, ssl: true, ssl_version: nil, comm: nil, timeout: 10) if port.nil? port = ssl ? 5671 : 5672 end @host = host @port = port @context = context @ssl = ssl @ssl_version = ssl_version @comm = comm @server_info = {} @channels = {} @frame_queue = [] @next_channel_id = 1 @timeout = timeout end |
Instance Attribute Details
#comm ⇒ Rex::Socket::Comm (readonly)
Returns An optional, explicit object to use for creating the connection.
20 21 22 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 20 def comm @comm end |
#host ⇒ String (readonly)
Returns The AMQP server host.
11 12 13 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 11 def host @host end |
#port ⇒ Integer (readonly)
Returns The AMQP server port.
14 15 16 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 14 def port @port end |
#server_info ⇒ Hash (readonly)
Returns A hash containing server information.
23 24 25 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 23 def server_info @server_info end |
#ssl ⇒ Boolean (readonly)
Returns Whether or not SSL is used for the connection.
17 18 19 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 17 def ssl @ssl end |
#timeout ⇒ Integer
Returns The communication timeout in seconds.
27 28 29 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 27 def timeout @timeout end |
Instance Method Details
#channel_close(channel) ⇒ NilClass
Close an established channel.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 188 def channel_close(channel) ch_close = Version091::Frames::AmqpVersion091MethodFrame.new ch_close.header.frame_channel = channel.id ch_close.arguments = Version091::Frames::MethodArguments::AmqpVersion091ChannelClose.new send_frame(ch_close) resp = recv_frame unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ChannelCloseOk) raise Error::UnexpectedReplyError.new(resp) end @channels.delete(channel.id) nil end |
#channel_open ⇒ Channel
Open a new channel.
169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 169 def channel_open ch_open = Version091::Frames::AmqpVersion091MethodFrame.new ch_open.header.frame_channel = cid = @next_channel_id ch_open.arguments = Version091::Frames::MethodArguments::AmqpVersion091ChannelOpen.new send_frame(ch_open) resp = recv_frame unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ChannelOpenOk) raise Error::UnexpectedReplyError.new(resp) end @next_channel_id += 1 @channels[cid] = Channel.new(self, cid) end |
#close ⇒ NilClass
Close the connection to the remote server.
76 77 78 79 80 81 82 83 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 76 def close if @conn && !@conn.closed? @conn.shutdown @conn.close end @conn = nil end |
#connect(t = -1)) ⇒ NilClass
Establish the connection to the remote server.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 57 def connect(t = -1) timeout = (t.nil? or t == -1) ? @timeout : t @conn = Rex::Socket::Tcp.create( 'PeerHost' => @host, 'PeerPort' => @port.to_i, 'Context' => @context, 'SSL' => @ssl, 'SSLVersion' => @ssl_version, 'Timeout' => timeout, 'Comm' => @comm ) nil end |
#connection_close ⇒ NilClass
Close the established connection by performing the necessary handshake.
206 207 208 209 210 211 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 206 def connection_close send_connection_close recv_connection_close_ok nil end |
#connection_open(vhost) ⇒ NilClass
Open a connection by performing the necessary handshake.
217 218 219 220 221 222 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 217 def connection_open(vhost) send_connection_open(virtual_host: vhost) recv_connection_open_ok nil end |
#connection_start(username, password) ⇒ NilClass
Start a connection by performing the necessary handshake. The caller needs to validate the response to ensure authentication succeeded.
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 230 def connection_start(username, password) recv_connection_start unless @server_info[:security_mechanisms].include?('PLAIN') # PLAIN is supported by default, others can be added via plugins raise Error::NegotiationError.new('There are no mutually supported authentication mechanisms.') end # prefer en_US if it's available, otherwise select one at random if @server_info[:locales].include?('en_US') locale = 'en_US' else locale = @server_info[:locales].sample end send_connection_start_ok({ # Per the spec, these properties "should" contain: product, version, platform, copyright, and information client_properties: [ { name: 'capabilities', data: { data_type: 'F'.ord, data: [ { name: 'authentication_failure_close', data: { data_type: 't'.ord, data: true } }, { name: 'basic.nack', data: { data_type: 't'.ord, data: true } }, { name: 'connection.blocked', data: { data_type: 't'.ord, data: true } }, { name: 'consumer_cancel_notify', data: { data_type: 't'.ord, data: true } }, { name: 'publisher_confirms', data: { data_type: 't'.ord, data: true } } ] } } ], # https://www.rabbitmq.com/access-control.html#mechanisms mechanism: 'PLAIN', response: build_sasl_response_plain(username, password), locale: locale }) end |
#login(username, password, vhost: '/') ⇒ Boolean
Login to the remote server. The connection will be started automatically if it has not already been established.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 91 def login(username, password, vhost: '/') connect if @conn.nil? send_protocol_header connection_start(username, password) resp = recv_frame if is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionClose) close return false elsif !is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionTune) raise Error::UnexpectedReplyError.new(resp) end @server_info[:tuning] = resp.arguments.snapshot connection_tune_ok = Version091::Frames::AmqpVersion091MethodFrame.new connection_tune_ok.arguments = Version091::Frames::MethodArguments::AmqpVersion091ConnectionTuneOk.new( resp.arguments.snapshot ) send_frame(connection_tune_ok) connection_open(vhost) true end |
#recv_connection_close_ok ⇒ Object
263 264 265 266 267 268 269 270 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 263 def recv_connection_close_ok resp = recv_frame unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionCloseOk) raise Error::UnexpectedReplyError.new(resp) end resp end |
#recv_connection_open_ok ⇒ Object
272 273 274 275 276 277 278 279 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 272 def recv_connection_open_ok resp = recv_frame unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionOpenOk) raise Error::UnexpectedReplyError.new(resp) end resp end |
#recv_connection_start ⇒ Object
281 282 283 284 285 286 287 288 289 290 291 292 293 294 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 281 def recv_connection_start resp = recv_frame unless is_method_frame?(resp, Version091::Frames::MethodArguments::AmqpVersion091ConnectionStart) raise Error::UnexpectedReplyError.new(resp) end @server_info = { properties: resp.arguments.server_properties.coerce, security_mechanisms: resp.arguments.mechanisms.split(' '), locales: resp.arguments.locales.split(' ') } resp end |
#recv_frame ⇒ BinData::Record
Receive a frame from the connected peer with a timeout.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 128 def recv_frame remaining = @timeout header_raw, elapsed_time = Rex::Stopwatch.elapsed_time do num_bytes = Version091::Frames::AmqpVersion091FrameHeader.new.num_bytes @conn.get_once(num_bytes, remaining) end remaining -= elapsed_time header = Version091::Frames::AmqpVersion091FrameHeader.read(header_raw) body = '' while (body.size < (header.frame_size + 1)) && remaining > 0 chunk, elapsed_time = Rex::Stopwatch.elapsed_time do @conn.read((header.frame_size + 1) - body.size, remaining) end remaining -= elapsed_time body << chunk end unless body.size == (header.frame_size + 1) if remaining <= 0 raise Rex::TimeoutError, 'Failed to read the response data due to timeout.' end Error::InvalidFrameError.new end case header.frame_type when 1 frame = Version091::Frames::AmqpVersion091MethodFrame.read(header.to_binary_s + body) when 2 frame = Version091::Frames::AmqpVersion091ContentHeaderFrame.read(header.to_binary_s + body) when 3 frame = Version091::Frames::AmqpVersion091ContentBodyFrame.read(header.to_binary_s + body) end frame end |
#send_connection_close(arguments = {}) ⇒ Object
296 297 298 299 300 301 302 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 296 def send_connection_close(arguments={}) conn_close = Version091::Frames::AmqpVersion091MethodFrame.new conn_close.arguments = Version091::Frames::MethodArguments::AmqpVersion091ConnectionClose.new(arguments) send_frame(conn_close) nil end |
#send_connection_open(arguments = {}) ⇒ Object
304 305 306 307 308 309 310 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 304 def send_connection_open(arguments={}) connection_open = Version091::Frames::AmqpVersion091MethodFrame.new connection_open.arguments = Version091::Frames::MethodArguments::AmqpVersion091ConnectionOpen.new(arguments) send_frame(connection_open) nil end |
#send_connection_start_ok(arguments = {}) ⇒ Object
312 313 314 315 316 317 318 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 312 def send_connection_start_ok(arguments={}) connection_start_ok = Version091::Frames::AmqpVersion091MethodFrame.new connection_start_ok.arguments = Version091::Frames::MethodArguments::AmqpVersion091ConnectionStartOk.new(arguments) send_frame(connection_start_ok) nil end |
#send_frame(frame) ⇒ Integer
Send a frame to the connected peer.
121 122 123 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 121 def send_frame(frame) @conn.put(frame.to_binary_s) end |
#send_protocol_header ⇒ Object
320 321 322 323 324 |
# File 'lib/rex/proto/amqp/version_0_9_1/client.rb', line 320 def send_protocol_header send_frame(Version091::Frames::AmqpVersion091ProtocolHeader.new) nil end |