Class: Rex::IO::RingBuffer
- Inherits:
-
Object
- Object
- Rex::IO::RingBuffer
- Defined in:
- lib/rex/io/ring_buffer.rb
Defined Under Namespace
Instance Attribute Summary collapse
-
#beg ⇒ Object
The index of the earliest data fragment in the ring.
-
#cur ⇒ Object
The sequence number of the earliest data fragment in the ring.
-
#fd ⇒ Object
The associated socket or IO object for this ring buffer.
-
#monitor ⇒ Object
The thread handle of the built-in monitor when used.
-
#monitor_thread_error ⇒ Object
:nodoc: #.
-
#mutex ⇒ Object
The mutex locking access to the queue.
-
#queue ⇒ Object
The data queue, essentially an array of two-element arrays, containing a sequence and data buffer.
-
#seq ⇒ Object
The next available sequence number.
-
#size ⇒ Object
The number of available slots in the queue.
Instance Method Summary collapse
-
#base_sequence ⇒ Object
The base_sequence method returns the earliest sequence number in the queue.
-
#clear_data ⇒ Object
The clear_data method wipes the ring buffer.
-
#create_stream ⇒ Object
The create_steam method assigns a IO::Socket compatible object to the ringer buffer.
-
#initialize(socket, opts = {}) ⇒ RingBuffer
constructor
Create a new ring buffer.
-
#last_sequence ⇒ Object
The last_sequence method returns the “next” sequence number where new data will be available.
-
#monitor_thread ⇒ Object
The built-in monitor thread (normally unused with Metasploit).
-
#put(data, opts = {}) ⇒ Object
Push data back into the associated stream socket.
-
#read_data(ptr = nil) ⇒ Object
The read_data method returns a two element array with the new reader cursor (a sequence number) and the returned data buffer (if any).
-
#select ⇒ Object
The select method returns when there is a chance of new data XXX: This is mostly useless and requires a rewrite to use a real select or notify mechanism.
-
#start_monitor ⇒ Object
Start the built-in monitor, not called when used in a larger framework.
-
#stop_monitor ⇒ Object
Stop the built-in monitor.
-
#store_data(data) ⇒ Object
The store_data method is used to insert data into the ring buffer.
-
#wait(seq) ⇒ Object
The wait method blocks until new data is available.
-
#wait_for(seq, timeout = 1) ⇒ Object
The wait_for method blocks until new data is available or the timeout is reached.
Constructor Details
#initialize(socket, opts = {}) ⇒ RingBuffer
Create a new ring buffer
31 32 33 34 35 36 37 38 39 |
# File 'lib/rex/io/ring_buffer.rb', line 31 def initialize(socket, opts={}) self.size = opts[:size] || (1024 * 4) self.fd = socket self.seq = 0 self.beg = 0 self.cur = 0 self.queue = Array.new( self.size ) self.mutex = Mutex.new end |
Instance Attribute Details
#beg ⇒ Object
The index of the earliest data fragment in the ring
23 24 25 |
# File 'lib/rex/io/ring_buffer.rb', line 23 def beg @beg end |
#cur ⇒ Object
The sequence number of the earliest data fragment in the ring
24 25 26 |
# File 'lib/rex/io/ring_buffer.rb', line 24 def cur @cur end |
#fd ⇒ Object
The associated socket or IO object for this ring buffer
20 21 22 |
# File 'lib/rex/io/ring_buffer.rb', line 20 def fd @fd end |
#monitor ⇒ Object
The thread handle of the built-in monitor when used
25 26 27 |
# File 'lib/rex/io/ring_buffer.rb', line 25 def monitor @monitor end |
#monitor_thread_error ⇒ Object
:nodoc: #
26 27 28 |
# File 'lib/rex/io/ring_buffer.rb', line 26 def monitor_thread_error @monitor_thread_error end |
#mutex ⇒ Object
The mutex locking access to the queue
22 23 24 |
# File 'lib/rex/io/ring_buffer.rb', line 22 def mutex @mutex end |
#queue ⇒ Object
The data queue, essentially an array of two-element arrays, containing a sequence and data buffer
18 19 20 |
# File 'lib/rex/io/ring_buffer.rb', line 18 def queue @queue end |
#seq ⇒ Object
The next available sequence number
19 20 21 |
# File 'lib/rex/io/ring_buffer.rb', line 19 def seq @seq end |
#size ⇒ Object
The number of available slots in the queue
21 22 23 |
# File 'lib/rex/io/ring_buffer.rb', line 21 def size @size end |
Instance Method Details
#base_sequence ⇒ Object
The base_sequence method returns the earliest sequence number in the queue. This is zero until all slots are filled and the ring rotates.
175 176 177 178 179 180 |
# File 'lib/rex/io/ring_buffer.rb', line 175 def base_sequence self.mutex.synchronize do return 0 if not self.queue[self.beg] return self.queue[self.beg][0] end end |
#clear_data ⇒ Object
The clear_data method wipes the ring buffer
84 85 86 87 88 89 90 91 |
# File 'lib/rex/io/ring_buffer.rb', line 84 def clear_data self.mutex.synchronize do self.seq = 0 self.beg = 0 self.cur = 0 self.queue = Array.new( self.size ) end end |
#create_stream ⇒ Object
The create_steam method assigns a IO::Socket compatible object to the ringer buffer
193 194 195 |
# File 'lib/rex/io/ring_buffer.rb', line 193 def create_stream Stream.new(self) end |
#last_sequence ⇒ Object
The last_sequence method returns the “next” sequence number where new data will be available.
186 187 188 |
# File 'lib/rex/io/ring_buffer.rb', line 186 def last_sequence self.seq end |
#monitor_thread ⇒ Object
The built-in monitor thread (normally unused with Metasploit)
59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/rex/io/ring_buffer.rb', line 59 def monitor_thread Thread.new do begin while self.fd buff = self.fd.get_once(-1, 1.0) next if not buff store_data(buff) end rescue ::Exception => e self.monitor_thread_error = e end end end |
#put(data, opts = {}) ⇒ Object
Push data back into the associated stream socket. Logging must occur elsewhere, this function is simply a passthrough.
77 78 79 |
# File 'lib/rex/io/ring_buffer.rb', line 77 def put(data, opts={}) self.fd.put(data, opts={}) end |
#read_data(ptr = nil) ⇒ Object
The read_data method returns a two element array with the new reader cursor (a sequence number) and the returned data buffer (if any). A result of nil/nil indicates that no data is available
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/rex/io/ring_buffer.rb', line 122 def read_data(ptr=nil) self.mutex.synchronize do # Verify that there is data in the queue return [nil,nil] if not self.queue[self.beg] # Configure the beginning read pointer (sequence number, not index) ptr ||= self.queue[self.beg][0] return [nil,nil] if not ptr # If the pointer is below our baseline, we lost some data, so jump forward if ptr < self.queue[self.beg][0] ptr = self.queue[self.beg][0] end # Calculate how many blocks exist between the current sequence number # and the requested pointer, this becomes the number of blocks we will # need to read to satisfy the result. Due to the mutex block, we do # not need to scan to find the sequence of the starting block or # check the sequence of the ending block. dis = self.seq - ptr # If the requested sequnce number is less than our base pointer, it means # that no new data is available and we should return empty. return [nil,nil] if dis < 0 # Calculate the beginning block index and number of blocks to read off = ptr - self.queue[self.beg][0] set = (self.beg + off) % self.size # Build the buffer by reading forward by the number of blocks needed # and return the last read sequence number, plus one, as the new read # pointer. buff = "" cnt = 0 lst = ptr ptr.upto(self.seq) do |i| block = self.queue[ (set + cnt) % self.size ] lst,data = block[0],block[1] buff += data cnt += 1 end return [lst + 1, buff] end end |
#select ⇒ Object
The select method returns when there is a chance of new data XXX: This is mostly useless and requires a rewrite to use a
real select or notify mechanism
202 203 204 |
# File 'lib/rex/io/ring_buffer.rb', line 202 def select ::IO.select([ self.fd ], nil, [ self.fd ], 0.10) end |
#start_monitor ⇒ Object
Start the built-in monitor, not called when used in a larger framework
44 45 46 |
# File 'lib/rex/io/ring_buffer.rb', line 44 def start_monitor self.monitor = monitor_thread if not self.monitor end |
#stop_monitor ⇒ Object
Stop the built-in monitor
51 52 53 54 |
# File 'lib/rex/io/ring_buffer.rb', line 51 def stop_monitor self.monitor.kill if self.monitor self.monitor = nil end |
#store_data(data) ⇒ Object
The store_data method is used to insert data into the ring buffer.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/rex/io/ring_buffer.rb', line 96 def store_data(data) self.mutex.synchronize do # self.cur points to the array index of queue containing the last item # adding data will result in cur + 1 being used to store said data # if cur is larger than size - 1, it will wrap back around. If cur # is *smaller* beg, beg is increemnted to cur + 1 (and wrapped if # necessary loc = 0 if self.seq > 0 loc = ( self.cur + 1 ) % self.size if loc <= self.beg self.beg = (self.beg + 1) % self.size end end self.queue[loc] = [self.seq += 1, data] self.cur = loc end end |
#wait(seq) ⇒ Object
The wait method blocks until new data is available
209 210 211 212 213 214 215 |
# File 'lib/rex/io/ring_buffer.rb', line 209 def wait(seq) nseq = nil while not nseq nseq,data = read_data(seq) select end end |
#wait_for(seq, timeout = 1) ⇒ Object
The wait_for method blocks until new data is available or the timeout is reached
220 221 222 223 224 225 226 227 |
# File 'lib/rex/io/ring_buffer.rb', line 220 def wait_for(seq,timeout=1) begin ::Timeout.timeout(timeout) do wait(seq) end rescue ::Timeout::Error end end |