Module: Rex::IO::Stream

Included in:
Socket::Tcp
Defined in:
lib/rex/io/stream.rb

Overview

This mixin is an abstract representation of a streaming connection. Streams extend classes that must implement the following methods:

syswrite(buffer)
sysread(length)
shutdown(how)
close
peerinfo
localinfo

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#abortive_closeObject

This flag indicates whether or not an abortive close has been issued.



316
317
318
# File 'lib/rex/io/stream.rb', line 316

def abortive_close
  @abortive_close
end

Instance Method Details

#<<(buf) ⇒ Object

This method writes the supplied buffer to the stream by calling the write routine.



129
130
131
# File 'lib/rex/io/stream.rb', line 129

def <<(buf)
	return write(buf.to_s)
end

#>>Object

This method calls get_once() to read pending data from the socket



136
137
138
# File 'lib/rex/io/stream.rb', line 136

def >>
	get_once
end

#def_block_sizeObject

The default block size to read in chunks from the wire.



309
310
311
# File 'lib/rex/io/stream.rb', line 309

def def_block_size
	16384
end

#def_max_loopsObject

The maximum number of read loops to perform before returning to the caller.



302
303
304
# File 'lib/rex/io/stream.rb', line 302

def def_max_loops
	1024
end

#def_read_loop_timeoutObject

The default number of seconds to wait while in a read loop after read data has been found.



294
295
296
# File 'lib/rex/io/stream.rb', line 294

def def_read_loop_timeout
	0.1
end

#def_read_timeoutObject

The default number of seconds to wait for a read operation to timeout.



286
287
288
# File 'lib/rex/io/stream.rb', line 286

def def_read_timeout
	10
end

#def_write_timeoutObject

The default number of seconds to wait for a write operation to timeout.



279
280
281
# File 'lib/rex/io/stream.rb', line 279

def def_write_timeout
	10
end

#fdObject

This method returns the selectable file descriptor, or self by default.



115
116
117
# File 'lib/rex/io/stream.rb', line 115

def fd
	self
end

#get(timeout = nil, ltimeout = def_read_loop_timeout, opts = {}) ⇒ Object

This method reads as much data as it can from the wire given a maximum timeout.



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/rex/io/stream.rb', line 217

def get(timeout = nil, ltimeout = def_read_loop_timeout, opts = {})
	# For those people who are used to being able to use a negative timeout!
	if (timeout and timeout.to_i < 0)
		timeout = nil
	end

	# No data in the first place? bust.
	if (has_read_data?(timeout) == false)
		return nil
	end

	buf = ""
	lps = 0
	eof = false

	# Keep looping until there is no more data to be gotten..
	while (has_read_data?(ltimeout) == true)
		# Catch EOF errors so that we can handle them properly.
		begin
			temp = read(def_block_size)
		rescue EOFError
			eof = true
		end

		# If we read zero bytes and we had data, then we've hit EOF
		if (temp and temp.length == 0)
			eof = true
		end

		# If we reached EOF and there are no bytes in the buffer we've been
		# reading into, then throw an EOF error.
		if (eof)
			# If we've already read at least some data, then it's time to
			# break out and let it be processed before throwing an EOFError.
			if (buf.length > 0)
				break
			else
				raise EOFError
			end
		end

		break if (temp == nil or temp.empty? == true)

		buf += temp
		lps += 1

		break if (lps >= def_max_loops)
	end

	# Return the entire buffer we read in
	return buf
end

#get_once(length = -1,, timeout = def_read_timeout) ⇒ Object

This method emulates the behavior of Pex::Socket::Recv in MSF2



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/rex/io/stream.rb', line 197

def get_once(length = -1, timeout = def_read_timeout)

	if (has_read_data?(timeout) == false)
		return nil
	end

	bsize = (length == -1) ? def_block_size : length

	begin
		return read(bsize)
	rescue Exception
	end

	return ''
end

#has_read_data?(timeout = nil) ⇒ Boolean

Polls the stream to see if there is any read data available. Returns true if data is available for reading, otherwise false is returned.

Returns:

  • (Boolean)


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/rex/io/stream.rb', line 86

def has_read_data?(timeout = nil)

	# Allow a timeout of "0" that waits almost indefinitely for input, this
	# mimics the behavior of Rex::ThreadSafe.select() and fixes some corner
	# cases of unintentional no-wait timeouts.
	timeout = 3600 if (timeout and timeout == 0)

	begin
		if ((rv = ::IO.select([ fd ], nil, nil, timeout)) and
		    (rv[0]) and
		    (rv[0][0] == fd))
			true
		else
			false
		end
	rescue ::Errno::EBADF, ::Errno::ENOTSOCK
		raise ::EOFError
	rescue StreamClosedError, ::IOError, ::EOFError, ::Errno::EPIPE
		# If the thing that lead to the closure was an abortive close, then
		# don't raise the stream closed error.
		return false if (fd.abortive_close == true)

		raise $!
	end
end

#put(buf, opts = {}) ⇒ Object

This method writes the full contents of the supplied buffer, optionally with a timeout.



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/rex/io/stream.rb', line 172

def put(buf, opts = {})
	return 0 if (buf == nil or buf.length == 0)

	send_len = buf.length
	send_idx = 0
	wait     = opts['Timeout'] || 0

	# Keep writing until our send length drops to zero
	while (send_idx < send_len)
		curr_len  = timed_write(buf[send_idx, buf.length-send_idx], wait, opts)

		# If the write operation failed due to an IOError, then we fail.
		return buf.length - send_len if (curr_len == nil)

		send_len -= curr_len
		send_idx += curr_len
	end

	return buf.length - send_len
end

#read(length = nil, opts = {}) ⇒ Object

This method reads data of the supplied length from the stream.



68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/rex/io/stream.rb', line 68

def read(length = nil, opts = {})
	
	begin
		return fd.read_nonblock( length ) 				
	rescue ::Errno::EAGAIN, ::Errno::EWOULDBLOCK
		# Sleep for a half a second, or until we can read again
		Rex::ThreadSafe.select( [ fd ], nil, nil, 0.5 )
		# Decrement the block size to handle full sendQs better
		retry
	rescue ::IOError, ::Errno::EPIPE
		return nil if (fd.abortive_close == true)
	end
end

#timed_read(length = nil, wait = def_read_timeout, opts = {}) ⇒ Object

This method reads from the stream, optionally timing out after a period of time.



158
159
160
161
162
163
164
165
166
# File 'lib/rex/io/stream.rb', line 158

def timed_read(length = nil, wait = def_read_timeout, opts = {})
	if (wait and wait > 0)
		Timeout.timeout(wait) {
			return read(length, opts)
		}
	else
		return read(length, opts)
	end
end

#timed_write(buf, wait = def_write_timeout, opts = {}) ⇒ Object

This method writes to the stream, optionally timing out after a period of time.



144
145
146
147
148
149
150
151
152
# File 'lib/rex/io/stream.rb', line 144

def timed_write(buf, wait = def_write_timeout, opts = {})
	if (wait and wait > 0)
		Timeout.timeout(wait) {
			return write(buf, opts)
		}
	else
		return write(buf, opts)
	end
end

#write(buf, opts = {}) ⇒ Object

This method writes the supplied buffer to the stream. This method intelligent reduces the size of supplied buffers so that ruby doesn’t get into a potential global thread blocking state when used on blocking sockets. That is, this method will send the supplied buffer in chunks of, at most, 32768 bytes.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/rex/io/stream.rb', line 34

def write(buf, opts = {})
	total_sent   = 0
	total_length = buf.length
	block_size   = 32768
	
	begin
		while( total_sent < total_length )
			s = Rex::ThreadSafe.select( nil, [ fd ], nil, 0.2 )
			if( s == nil || s[0] == nil )
				next
			end
			data = buf[total_sent, block_size]
			sent = fd.write_nonblock( data )
			if sent > 0
				total_sent += sent
			end
		end
	rescue ::Errno::EAGAIN, ::Errno::EWOULDBLOCK
		# Sleep for a half a second, or until we can write again
		Rex::ThreadSafe.select( nil, [ fd ], nil, 0.5 )
		# Decrement the block size to handle full sendQs better
		block_size = 1024
		# Try to write the data again
		retry
	rescue ::IOError, ::Errno::EPIPE
		return nil if (fd.abortive_close == true)
	end
	
	total_sent
end