Class: Rex::IO::RingBuffer
- Inherits:
-
Object
- Object
- Rex::IO::RingBuffer
- Defined in:
- lib/rex/io/ring_buffer.rb
Defined Under Namespace
Classes: Stream
Instance Attribute Summary collapse
-
#beg ⇒ Object
The index of the earliest data fragment in the ring.
-
#cur ⇒ Object
The sequence number of the earliest data fragment in the ring.
-
#fd ⇒ Object
The associated socket or IO object for this ring buffer.
-
#monitor ⇒ Object
The thread handle of the built-in monitor when used.
-
#monitor_thread_error ⇒ Object
:nodoc: #.
-
#mutex ⇒ Object
The mutex locking access to the queue.
-
#queue ⇒ Object
The data queue, essentially an array of two-element arrays, containing a sequence and data buffer.
-
#seq ⇒ Object
The next available sequence number.
-
#size ⇒ Object
The number of available slots in the queue.
Instance Method Summary collapse
-
#base_sequence ⇒ Object
The base_sequence method returns the earliest sequence number in the queue.
-
#clear_data ⇒ Object
The clear_data method wipes the ring buffer.
-
#create_stream ⇒ Object
The create_steam method assigns a IO::Socket compatible object to the ringer buffer.
-
#initialize(socket, opts = {}) ⇒ RingBuffer
constructor
Create a new ring buffer.
- #inspect ⇒ Object
-
#last_sequence ⇒ Object
The last_sequence method returns the “next” sequence number where new data will be available.
-
#monitor_thread ⇒ Object
The built-in monitor thread (normally unused with Metasploit).
-
#put(data, opts = {}) ⇒ Object
Push data back into the associated stream socket.
-
#read_data(ptr = nil) ⇒ Object
The read_data method returns a two element array with the new reader cursor (a sequence number) and the returned data buffer (if any).
-
#select ⇒ Object
The select method returns when there is a chance of new data XXX: This is mostly useless and requires a rewrite to use a real select or notify mechanism.
-
#start_monitor ⇒ Object
Start the built-in monitor, not called when used in a larger framework.
-
#stop_monitor ⇒ Object
Stop the built-in monitor.
-
#store_data(data) ⇒ Object
The store_data method is used to insert data into the ring buffer.
-
#wait(seq) ⇒ Object
The wait method blocks until new data is available.
-
#wait_for(seq, timeout = 1) ⇒ Object
The wait_for method blocks until new data is available or the timeout is reached.
Constructor Details
#initialize(socket, opts = {}) ⇒ RingBuffer
Create a new ring buffer
32 33 34 35 36 37 38 39 40 |
# File 'lib/rex/io/ring_buffer.rb', line 32 def initialize(socket, opts={}) self.size = opts[:size] || (1024 * 4) self.fd = socket self.seq = 0 self.beg = 0 self.cur = 0 self.queue = Array.new( self.size ) self.mutex = Mutex.new end |
Instance Attribute Details
#beg ⇒ Object
The index of the earliest data fragment in the ring
24 25 26 |
# File 'lib/rex/io/ring_buffer.rb', line 24 def beg @beg end |
#cur ⇒ Object
The sequence number of the earliest data fragment in the ring
25 26 27 |
# File 'lib/rex/io/ring_buffer.rb', line 25 def cur @cur end |
#fd ⇒ Object
The associated socket or IO object for this ring buffer
21 22 23 |
# File 'lib/rex/io/ring_buffer.rb', line 21 def fd @fd end |
#monitor ⇒ Object
The thread handle of the built-in monitor when used
26 27 28 |
# File 'lib/rex/io/ring_buffer.rb', line 26 def monitor @monitor end |
#monitor_thread_error ⇒ Object
:nodoc: #
27 28 29 |
# File 'lib/rex/io/ring_buffer.rb', line 27 def monitor_thread_error @monitor_thread_error end |
#mutex ⇒ Object
The mutex locking access to the queue
23 24 25 |
# File 'lib/rex/io/ring_buffer.rb', line 23 def mutex @mutex end |
#queue ⇒ Object
The data queue, essentially an array of two-element arrays, containing a sequence and data buffer
19 20 21 |
# File 'lib/rex/io/ring_buffer.rb', line 19 def queue @queue end |
#seq ⇒ Object
The next available sequence number
20 21 22 |
# File 'lib/rex/io/ring_buffer.rb', line 20 def seq @seq end |
#size ⇒ Object
The number of available slots in the queue
22 23 24 |
# File 'lib/rex/io/ring_buffer.rb', line 22 def size @size end |
Instance Method Details
#base_sequence ⇒ Object
The base_sequence method returns the earliest sequence number in the queue. This is zero until all slots are filled and the ring rotates.
180 181 182 183 184 185 |
# File 'lib/rex/io/ring_buffer.rb', line 180 def base_sequence self.mutex.synchronize do return 0 if not self.queue[self.beg] return self.queue[self.beg][0] end end |
#clear_data ⇒ Object
The clear_data method wipes the ring buffer
89 90 91 92 93 94 95 96 |
# File 'lib/rex/io/ring_buffer.rb', line 89 def clear_data self.mutex.synchronize do self.seq = 0 self.beg = 0 self.cur = 0 self.queue = Array.new( self.size ) end end |
#create_stream ⇒ Object
The create_steam method assigns a IO::Socket compatible object to the ringer buffer
198 199 200 |
# File 'lib/rex/io/ring_buffer.rb', line 198 def create_stream Stream.new(self) end |
#inspect ⇒ Object
42 43 44 |
# File 'lib/rex/io/ring_buffer.rb', line 42 def inspect "#<Rex::IO::RingBuffer @size=#{size} @fd=#{fd} @seq=#{seq} @beg=#{beg} @cur=#{cur}>" end |
#last_sequence ⇒ Object
The last_sequence method returns the “next” sequence number where new data will be available.
191 192 193 |
# File 'lib/rex/io/ring_buffer.rb', line 191 def last_sequence self.seq end |
#monitor_thread ⇒ Object
The built-in monitor thread (normally unused with Metasploit)
64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/rex/io/ring_buffer.rb', line 64 def monitor_thread Thread.new do begin while self.fd buff = self.fd.get_once(-1, 1.0) next if not buff store_data(buff) end rescue ::Exception => e self.monitor_thread_error = e end end end |
#put(data, opts = {}) ⇒ Object
Push data back into the associated stream socket. Logging must occur elsewhere, this function is simply a passthrough.
82 83 84 |
# File 'lib/rex/io/ring_buffer.rb', line 82 def put(data, opts={}) self.fd.put(data, opts={}) end |
#read_data(ptr = nil) ⇒ Object
The read_data method returns a two element array with the new reader cursor (a sequence number) and the returned data buffer (if any). A result of nil/nil indicates that no data is available
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/rex/io/ring_buffer.rb', line 127 def read_data(ptr=nil) self.mutex.synchronize do # Verify that there is data in the queue return [nil,nil] if not self.queue[self.beg] # Configure the beginning read pointer (sequence number, not index) ptr ||= self.queue[self.beg][0] return [nil,nil] if not ptr # If the pointer is below our baseline, we lost some data, so jump forward if ptr < self.queue[self.beg][0] ptr = self.queue[self.beg][0] end # Calculate how many blocks exist between the current sequence number # and the requested pointer, this becomes the number of blocks we will # need to read to satisfy the result. Due to the mutex block, we do # not need to scan to find the sequence of the starting block or # check the sequence of the ending block. dis = self.seq - ptr # If the requested sequnce number is less than our base pointer, it means # that no new data is available and we should return empty. return [nil,nil] if dis < 0 # Calculate the beginning block index and number of blocks to read off = ptr - self.queue[self.beg][0] set = (self.beg + off) % self.size # Build the buffer by reading forward by the number of blocks needed # and return the last read sequence number, plus one, as the new read # pointer. buff = "" cnt = 0 lst = ptr ptr.upto(self.seq) do |i| block = self.queue[ (set + cnt) % self.size ] lst,data = block[0],block[1] buff += data cnt += 1 end return [lst + 1, buff] end end |
#select ⇒ Object
The select method returns when there is a chance of new data XXX: This is mostly useless and requires a rewrite to use a
real select or notify mechanism
207 208 209 |
# File 'lib/rex/io/ring_buffer.rb', line 207 def select ::IO.select([ self.fd ], nil, [ self.fd ], 0.10) end |
#start_monitor ⇒ Object
Start the built-in monitor, not called when used in a larger framework
49 50 51 |
# File 'lib/rex/io/ring_buffer.rb', line 49 def start_monitor self.monitor = monitor_thread if not self.monitor end |
#stop_monitor ⇒ Object
Stop the built-in monitor
56 57 58 59 |
# File 'lib/rex/io/ring_buffer.rb', line 56 def stop_monitor self.monitor.kill if self.monitor self.monitor = nil end |
#store_data(data) ⇒ Object
The store_data method is used to insert data into the ring buffer.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/rex/io/ring_buffer.rb', line 101 def store_data(data) self.mutex.synchronize do # self.cur points to the array index of queue containing the last item # adding data will result in cur + 1 being used to store said data # if cur is larger than size - 1, it will wrap back around. If cur # is *smaller* beg, beg is increemnted to cur + 1 (and wrapped if # necessary loc = 0 if self.seq > 0 loc = ( self.cur + 1 ) % self.size if loc <= self.beg self.beg = (self.beg + 1) % self.size end end self.queue[loc] = [self.seq += 1, data] self.cur = loc end end |
#wait(seq) ⇒ Object
The wait method blocks until new data is available
214 215 216 217 218 219 220 |
# File 'lib/rex/io/ring_buffer.rb', line 214 def wait(seq) nseq = nil while not nseq nseq,data = read_data(seq) select end end |
#wait_for(seq, timeout = 1) ⇒ Object
The wait_for method blocks until new data is available or the timeout is reached
225 226 227 228 229 230 231 232 |
# File 'lib/rex/io/ring_buffer.rb', line 225 def wait_for(seq,timeout=1) begin ::Timeout.timeout(timeout) do wait(seq) end rescue ::Timeout::Error end end |