Class: Rex::IO::RingBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/rex/io/ring_buffer.rb

Defined Under Namespace

Classes: Stream, UnitTest

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(socket, opts = {}) ⇒ RingBuffer

Create a new ring buffer



31
32
33
34
35
36
37
38
39
# File 'lib/rex/io/ring_buffer.rb', line 31

def initialize(socket, opts={})
	self.size  = opts[:size] || (1024 * 4)
	self.fd    = socket
	self.seq   = 0
	self.beg   = 0
	self.cur   = 0
	self.queue = Array.new( self.size )
	self.mutex = Mutex.new
end

Instance Attribute Details

#begObject

The index of the earliest data fragment in the ring



23
24
25
# File 'lib/rex/io/ring_buffer.rb', line 23

def beg
  @beg
end

#curObject

The sequence number of the earliest data fragment in the ring



24
25
26
# File 'lib/rex/io/ring_buffer.rb', line 24

def cur
  @cur
end

#fdObject

The associated socket or IO object for this ring buffer



20
21
22
# File 'lib/rex/io/ring_buffer.rb', line 20

def fd
  @fd
end

#monitorObject

The thread handle of the built-in monitor when used



25
26
27
# File 'lib/rex/io/ring_buffer.rb', line 25

def monitor
  @monitor
end

#monitor_thread_errorObject

:nodoc: #



26
27
28
# File 'lib/rex/io/ring_buffer.rb', line 26

def monitor_thread_error
  @monitor_thread_error
end

#mutexObject

The mutex locking access to the queue



22
23
24
# File 'lib/rex/io/ring_buffer.rb', line 22

def mutex
  @mutex
end

#queueObject

The data queue, essentially an array of two-element arrays, containing a sequence and data buffer



18
19
20
# File 'lib/rex/io/ring_buffer.rb', line 18

def queue
  @queue
end

#seqObject

The next available sequence number



19
20
21
# File 'lib/rex/io/ring_buffer.rb', line 19

def seq
  @seq
end

#sizeObject

The number of available slots in the queue



21
22
23
# File 'lib/rex/io/ring_buffer.rb', line 21

def size
  @size
end

Instance Method Details

#base_sequenceObject

The base_sequence method returns the earliest sequence number in the queue. This is zero until all slots are filled and the ring rotates.



175
176
177
178
179
180
# File 'lib/rex/io/ring_buffer.rb', line 175

def base_sequence
	self.mutex.synchronize do
		return 0 if not self.queue[self.beg]
		return self.queue[self.beg][0]
	end
end

#clear_dataObject

The clear_data method wipes the ring buffer



84
85
86
87
88
89
90
91
# File 'lib/rex/io/ring_buffer.rb', line 84

def clear_data
	self.mutex.synchronize do
		self.seq   = 0
		self.beg   = 0
		self.cur   = 0
		self.queue = Array.new( self.size )
	end
end

#create_streamObject

The create_steam method assigns a IO::Socket compatible object to the ringer buffer



193
194
195
# File 'lib/rex/io/ring_buffer.rb', line 193

def create_stream
	Stream.new(self)
end

#last_sequenceObject

The last_sequence method returns the “next” sequence number where new data will be available.



186
187
188
# File 'lib/rex/io/ring_buffer.rb', line 186

def last_sequence
	self.seq
end

#monitor_threadObject

The built-in monitor thread (normally unused with Metasploit)



59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/rex/io/ring_buffer.rb', line 59

def monitor_thread
	Thread.new do 
		begin
		while self.fd
			buff = self.fd.get_once(-1, 1.0)
			next if not buff
			store_data(buff)
		end
		rescue ::Exception => e
			self.monitor_thread_error = e
		end
	end
end

#put(data, opts = {}) ⇒ Object

Push data back into the associated stream socket. Logging must occur elsewhere, this function is simply a passthrough.



77
78
79
# File 'lib/rex/io/ring_buffer.rb', line 77

def put(data, opts={})
	self.fd.put(data, opts={})
end

#read_data(ptr = nil) ⇒ Object

The read_data method returns a two element array with the new reader cursor (a sequence number) and the returned data buffer (if any). A result of nil/nil indicates that no data is available



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/rex/io/ring_buffer.rb', line 122

def read_data(ptr=nil)
	self.mutex.synchronize do
	
	# Verify that there is data in the queue
	return [nil,nil] if not self.queue[self.beg]
	
	# Configure the beginning read pointer (sequence number, not index)
	ptr ||= self.queue[self.beg][0]
	return [nil,nil] if not ptr
	
	# If the pointer is below our baseline, we lost some data, so jump forward
	if ptr < self.queue[self.beg][0]
		ptr = self.queue[self.beg][0]
	end
	
	# Calculate how many blocks exist between the current sequence number
	# and the requested pointer, this becomes the number of blocks we will
	# need to read to satisfy the result. Due to the mutex block, we do
	# not need to scan to find the sequence of the starting block or 
	# check the sequence of the ending block.
	dis = self.seq - ptr

	# If the requested sequnce number is less than our base pointer, it means
	# that no new data is available and we should return empty.
	return [nil,nil] if dis < 0

	# Calculate the beginning block index and number of blocks to read
	off = ptr - self.queue[self.beg][0]
	set = (self.beg + off) % self.size
	
	
	# Build the buffer by reading forward by the number of blocks needed
	# and return the last read sequence number, plus one, as the new read
	# pointer.
	buff = ""
	cnt  = 0
	lst  = ptr
	ptr.upto(self.seq) do |i|
		block = self.queue[ (set + cnt) % self.size ]
		lst,data = block[0],block[1]
		buff += data
		cnt += 1
	end
	
	return [lst + 1, buff]
	
	end
end

#selectObject

The select method returns when there is a chance of new data XXX: This is mostly useless and requires a rewrite to use a

real select or notify mechanism


202
203
204
# File 'lib/rex/io/ring_buffer.rb', line 202

def select
	::IO.select([ self.fd ], nil, [ self.fd ], 0.10)
end

#start_monitorObject

Start the built-in monitor, not called when used in a larger framework



44
45
46
# File 'lib/rex/io/ring_buffer.rb', line 44

def start_monitor
	self.monitor = monitor_thread if not self.monitor
end

#stop_monitorObject

Stop the built-in monitor



51
52
53
54
# File 'lib/rex/io/ring_buffer.rb', line 51

def stop_monitor
	self.monitor.kill if self.monitor
	self.monitor = nil
end

#store_data(data) ⇒ Object

The store_data method is used to insert data into the ring buffer.



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/rex/io/ring_buffer.rb', line 96

def store_data(data)
	self.mutex.synchronize do 
		# self.cur points to the array index of queue containing the last item
		# adding data will result in cur + 1 being used to store said data
		# if cur is larger than size - 1, it will wrap back around. If cur
		# is *smaller* beg, beg is increemnted to cur + 1 (and wrapped if
		# necessary
	
		loc = 0		
		if self.seq > 0
			loc = ( self.cur + 1 ) % self.size
		
			if loc <= self.beg
				self.beg = (self.beg + 1) % self.size
			end
		end
		
		self.queue[loc] = [self.seq += 1, data]
		self.cur = loc
	end
end

#wait(seq) ⇒ Object

The wait method blocks until new data is available



209
210
211
212
213
214
215
# File 'lib/rex/io/ring_buffer.rb', line 209

def wait(seq)
	nseq = nil
	while not nseq
		nseq,data = read_data(seq)
		select
	end
end

#wait_for(seq, timeout = 1) ⇒ Object

The wait_for method blocks until new data is available or the timeout is reached



220
221
222
223
224
225
226
227
# File 'lib/rex/io/ring_buffer.rb', line 220

def wait_for(seq,timeout=1)
	begin
		::Timeout.timeout(timeout) do
			wait(seq)
		end
	rescue ::Timeout::Error
	end
end