Class: RubyTorrent::PeerConnection
- Inherits:
-
Object
- Object
- RubyTorrent::PeerConnection
- Extended by:
- AttrReaderQ
- Includes:
- EventSource
- Defined in:
- lib/rubytorrent/peer.rb
Overview
The PeerConnection object deals with all the protocol issues. It keeps state information as to the connection and the peer. It is tightly integrated with the Controller object.
Remember to be “strict in what you send, lenient in what you accept”.
Constant Summary collapse
- BUFSIZE =
8192
- MAX_PEER_REQUESTS =
how many peer requests to keep queued
5
- MAX_REQUESTS =
how many requests for blocks to keep current
5
- MIN_REQUESTS =
get more blocks from controller when this limit is reached
1
- REQUEST_TIMEOUT =
60
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#peer_pieces ⇒ Object
readonly
Returns the value of attribute peer_pieces.
Instance Method Summary collapse
-
#cancel(block) ⇒ Object
called by Controller in the event that a request needs to be rescinded.
-
#choke=(now_choke) ⇒ Object
the Controller calls this from heartbeat thread to tell us whether to choke or not.
- #dlamt ⇒ Object
- #dlrate ⇒ Object
-
#have_piece(piece) ⇒ Object
Controller calls this to tell us that a complete piece has been received.
-
#initialize(name, controller, socket, package) ⇒ PeerConnection
constructor
number of seconds after sending a request before we decide it’s been forgotten.
- #last_recv_block_time ⇒ Object
- #last_recv_time ⇒ Object
- #last_send_block_time ⇒ Object
- #last_send_time ⇒ Object
- #peer_complete? ⇒ Boolean
- #pending_recv ⇒ Object
- #pending_send ⇒ Object
- #piece_available?(index) ⇒ Boolean
-
#send_blocks_and_reqs(dllim = nil, ullim = nil) ⇒ Object
this is called both by input_thread_step and by the controller’s heartbeat thread.
-
#send_keepalive ⇒ Object
Controller calls this to tell us to send a keepalive.
- #shutdown ⇒ Object
-
#snub=(now_snub) ⇒ Object
the Controller calls this from heartbeat thread to tell us whether to snub or not.
- #start ⇒ Object
- #start_time ⇒ Object
- #to_s ⇒ Object
- #ulamt ⇒ Object
- #ulrate ⇒ Object
Methods included from AttrReaderQ
attr_accessor_q, attr_reader_q, attr_writer_q
Methods included from EventSource
append_features, #on_event, #relay_event, #send_event, #unregister_events
Constructor Details
#initialize(name, controller, socket, package) ⇒ PeerConnection
number of seconds after sending a request before we decide it’s been forgotten
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/rubytorrent/peer.rb', line 116 def initialize(name, controller, socket, package) @name = name @controller = controller @socket = socket @package = package @running = false ## my state @want_blocks = [].extend(ArrayDelete2) # blocks i want @want_blocks_m = Mutex.new @choking = true @interested = false @snubbing = false ## peer's state @peer_want_blocks = [].extend(ArrayDelete2) @peer_choking = true # assumption of initial condition @peer_interested = false # ditto @peer_pieces = Array.new(@package.num_pieces, false) # ditto @peer_virgin = true # does the peer have any pieces at all? ## connection stats @dlmeter = RateMeter.new @ulmeter = RateMeter.new @send_q = Queue.new # output thread takes messages from here and # puts them on the wire end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
103 104 105 |
# File 'lib/rubytorrent/peer.rb', line 103 def name @name end |
#peer_pieces ⇒ Object (readonly)
Returns the value of attribute peer_pieces.
103 104 105 |
# File 'lib/rubytorrent/peer.rb', line 103 def peer_pieces @peer_pieces end |
Instance Method Details
#cancel(block) ⇒ Object
called by Controller in the event that a request needs to be rescinded.
215 216 217 218 219 220 221 222 223 |
# File 'lib/rubytorrent/peer.rb', line 215 def cancel(block) wblock = @want_blocks_m.synchronize { @want_blocks.delete2 block } unless wblock.nil? || !wblock.requested? rt_debug "#{self}: sending cancel for #{wblock}" (:cancel, {:index => wblock.pindex, :begin => wblock.begin, :length => wblock.length}) end get_want_blocks unless wblock.nil? end |
#choke=(now_choke) ⇒ Object
the Controller calls this from heartbeat thread to tell us whether to choke or not.
186 187 188 189 |
# File 'lib/rubytorrent/peer.rb', line 186 def choke=(now_choke) (now_choke ? :choke : :unchoke) unless @choking == now_choke @choking = now_choke end |
#dlamt ⇒ Object
208 |
# File 'lib/rubytorrent/peer.rb', line 208 def dlamt; @dlmeter.amt; end |
#dlrate ⇒ Object
206 |
# File 'lib/rubytorrent/peer.rb', line 206 def dlrate; @dlmeter.rate; end |
#have_piece(piece) ⇒ Object
Controller calls this to tell us that a complete piece has been received.
233 234 235 |
# File 'lib/rubytorrent/peer.rb', line 233 def have_piece(piece) (:have, {:index => piece.index}) end |
#last_recv_block_time ⇒ Object
204 |
# File 'lib/rubytorrent/peer.rb', line 204 def last_recv_block_time; @time[:recv_block]; end |
#last_recv_time ⇒ Object
202 |
# File 'lib/rubytorrent/peer.rb', line 202 def last_recv_time; @time[:recv]; end |
#last_send_block_time ⇒ Object
203 |
# File 'lib/rubytorrent/peer.rb', line 203 def last_send_block_time; @time[:send_block]; end |
#last_send_time ⇒ Object
201 |
# File 'lib/rubytorrent/peer.rb', line 201 def last_send_time; @time[:send]; end |
#peer_complete? ⇒ Boolean
200 |
# File 'lib/rubytorrent/peer.rb', line 200 def peer_complete?; @peer_pieces.all?; end |
#pending_recv ⇒ Object
145 |
# File 'lib/rubytorrent/peer.rb', line 145 def pending_recv; @want_blocks.find_all { |b| b.requested? }.length; end |
#pending_send ⇒ Object
146 |
# File 'lib/rubytorrent/peer.rb', line 146 def pending_send; @peer_want_blocks.length; end |
#piece_available?(index) ⇒ Boolean
210 |
# File 'lib/rubytorrent/peer.rb', line 210 def piece_available?(index); @peer_pieces[index]; end |
#send_blocks_and_reqs(dllim = nil, ullim = nil) ⇒ Object
this is called both by input_thread_step and by the controller’s heartbeat thread. it sends as many pending blocks as it can while keeping the amount below ‘ullim’, and sends as many requests as it can while keeping the amount below ‘dllim’.
returns the number of bytes requested and sent
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/rubytorrent/peer.rb', line 249 def send_blocks_and_reqs(dllim=nil, ullim=nil) sent_bytes = 0 reqd_bytes = 0 @want_blocks_m.synchronize do @want_blocks.each do |b| # puts "[][] #{self}: #{b} is #{b.requested? ? 'requested' : 'NOT requested'} and has time_elapsed of #{b.requested? ? b.time_elapsed.round : 'n/a'}s" if b.requested? && (b.time_elapsed > REQUEST_TIMEOUT) rt_warning "#{self}: for block #{b}, time elapsed since request is #{b.time_elapsed} > #{REQUEST_TIMEOUT}, assuming peer forgot about it" @want_blocks.delete b @controller.forget_blocks [b] end end end ## send :requests unless @peer_choking || !@interested @want_blocks_m.synchronize do @want_blocks.each do |b| break if dllim && (reqd_bytes >= dllim) next if b.requested? if @package.pieces[b.pindex].complete? # not sure that this will ever happen, but... rt_warning "#{self}: deleting scheduled block for already-complete piece #{b}" @want_blocks.delete b next end (:request, {:index => b.pindex, :begin => b.begin, :length => b.length}) reqd_bytes += b.length b.requested = true b.mark_time send_event(:requested_block, b) end end end ## send blocks # rt_debug "sending blocks. choking? #@choking, choked? #@peer_choking, ul rate #{ulrate}b/s, limit #@ulmeterlim" unless @peer_want_blocks.empty? unless @choking || !@peer_interested while !@peer_want_blocks.empty? break if ullim && (sent_bytes >= ullim) if (b = @peer_want_blocks.shift) sent_bytes += b.length @send_q.push b @time[:send_block] = Time.now send_event(:sent_block, b) end end end get_want_blocks [reqd_bytes, sent_bytes] end |
#send_keepalive ⇒ Object
Controller calls this to tell us to send a keepalive
238 239 240 241 |
# File 'lib/rubytorrent/peer.rb', line 238 def send_keepalive # rt_debug "* sending keepalive!" (:keepalive) end |
#shutdown ⇒ Object
225 226 227 228 229 |
# File 'lib/rubytorrent/peer.rb', line 225 def shutdown rt_debug "#{self.to_s}: shutting down" @running = false @socket.close rescue nil end |
#snub=(now_snub) ⇒ Object
the Controller calls this from heartbeat thread to tell us whether to snub or not.
193 194 195 196 197 198 |
# File 'lib/rubytorrent/peer.rb', line 193 def snub=(now_snub) unless @snubbing = now_snub @snubbing = now_snub choke = true if @snubbing end end |
#start ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/rubytorrent/peer.rb', line 148 def start @running = true @time = {:start => Time.now} Thread.new do # start input thread begin while @running; input_thread_step; end rescue SystemCallError, IOError, ProtocolError => e rt_debug "#{self} (input): #{e.}, releasing #{@want_blocks.length} claimed blocks and dying" # rt_debug e.backtrace.join("\n") @running = false @controller.forget_blocks @want_blocks end end Thread.new do # start output thread begin while @running; output_thread_step; end rescue SystemCallError, IOError, ProtocolError => e rt_debug "#{self} (output): #{e.}, releasing #{@want_blocks.length} claimed blocks and dying" # rt_debug e.backtrace.join("\n") @running = false @controller.forget_blocks @want_blocks end end ## queue the initial messages (:bitfield, {:bitfield => @package.pieces.map { |p| p.complete? }.extend(ArrayToBitstring).to_bitstring}) ## and that's it. if peer sends a bitfield, we'll send an ## interested and start requesting blocks at that point. if they ## don't, it means they don't have any pieces, so we can just sit ## tight. self end |
#start_time ⇒ Object
205 |
# File 'lib/rubytorrent/peer.rb', line 205 def start_time; @time[:start]; end |
#to_s ⇒ Object
211 |
# File 'lib/rubytorrent/peer.rb', line 211 def to_s; "<peer: #@name>"; end |
#ulamt ⇒ Object
209 |
# File 'lib/rubytorrent/peer.rb', line 209 def ulamt; @ulmeter.amt; end |
#ulrate ⇒ Object
207 |
# File 'lib/rubytorrent/peer.rb', line 207 def ulrate; @ulmeter.rate; end |