Class: RubyTorrent::PeerConnection

Inherits:
Object
  • Object
show all
Extended by:
AttrReaderQ
Includes:
EventSource
Defined in:
lib/rubytorrent/peer.rb

Overview

The PeerConnection object deals with all the protocol issues. It keeps state information as to the connection and the peer. It is tightly integrated with the Controller object.

Remember to be “strict in what you send, lenient in what you accept”.

Constant Summary collapse

BUFSIZE =
8192
MAX_PEER_REQUESTS =

how many peer requests to keep queued

5
MAX_REQUESTS =

how many requests for blocks to keep current

5
MIN_REQUESTS =

get more blocks from controller when this limit is reached

1
REQUEST_TIMEOUT =
60

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AttrReaderQ

attr_accessor_q, attr_reader_q, attr_writer_q

Methods included from EventSource

append_features, #on_event, #relay_event, #send_event, #unregister_events

Constructor Details

#initialize(name, controller, socket, package) ⇒ PeerConnection

number of seconds after sending a request before we decide it’s been forgotten



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/rubytorrent/peer.rb', line 116

def initialize(name, controller, socket, package)
  @name = name
  @controller = controller
  @socket = socket
  @package = package
  @running = false

  ## my state
  @want_blocks = [].extend(ArrayDelete2) # blocks i want
  @want_blocks_m = Mutex.new
  @choking = true
  @interested = false
  @snubbing = false

  ## peer's state
  @peer_want_blocks = [].extend(ArrayDelete2)
  @peer_choking = true # assumption of initial condition
  @peer_interested = false # ditto
  @peer_pieces = Array.new(@package.num_pieces, false) # ditto
  @peer_virgin = true # does the peer have any pieces at all?

  ## connection stats
  @dlmeter = RateMeter.new
  @ulmeter = RateMeter.new

  @send_q = Queue.new # output thread takes messages from here and
                      # puts them on the wire
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



103
104
105
# File 'lib/rubytorrent/peer.rb', line 103

def name
  @name
end

#peer_piecesObject (readonly)

Returns the value of attribute peer_pieces.



103
104
105
# File 'lib/rubytorrent/peer.rb', line 103

def peer_pieces
  @peer_pieces
end

Instance Method Details

#cancel(block) ⇒ Object

called by Controller in the event that a request needs to be rescinded.



215
216
217
218
219
220
221
222
223
# File 'lib/rubytorrent/peer.rb', line 215

def cancel(block)
  wblock = @want_blocks_m.synchronize { @want_blocks.delete2 block }
  unless wblock.nil? || !wblock.requested?
    rt_debug "#{self}: sending cancel for #{wblock}"
    queue_message(:cancel, {:index => wblock.pindex, :begin => wblock.begin,
                            :length => wblock.length})
  end
  get_want_blocks unless wblock.nil?
end

#choke=(now_choke) ⇒ Object

the Controller calls this from heartbeat thread to tell us whether to choke or not.



186
187
188
189
# File 'lib/rubytorrent/peer.rb', line 186

def choke=(now_choke)
  queue_message(now_choke ? :choke : :unchoke) unless @choking == now_choke
  @choking = now_choke
end

#dlamtObject



208
# File 'lib/rubytorrent/peer.rb', line 208

def dlamt; @dlmeter.amt; end

#dlrateObject



206
# File 'lib/rubytorrent/peer.rb', line 206

def dlrate; @dlmeter.rate; end

#have_piece(piece) ⇒ Object

Controller calls this to tell us that a complete piece has been received.



233
234
235
# File 'lib/rubytorrent/peer.rb', line 233

def have_piece(piece)
  queue_message(:have, {:index => piece.index})
end

#last_recv_block_timeObject



204
# File 'lib/rubytorrent/peer.rb', line 204

def last_recv_block_time; @time[:recv_block]; end

#last_recv_timeObject



202
# File 'lib/rubytorrent/peer.rb', line 202

def last_recv_time; @time[:recv]; end

#last_send_block_timeObject



203
# File 'lib/rubytorrent/peer.rb', line 203

def last_send_block_time; @time[:send_block]; end

#last_send_timeObject



201
# File 'lib/rubytorrent/peer.rb', line 201

def last_send_time; @time[:send]; end

#peer_complete?Boolean

Returns:

  • (Boolean)


200
# File 'lib/rubytorrent/peer.rb', line 200

def peer_complete?; @peer_pieces.all?; end

#pending_recvObject



145
# File 'lib/rubytorrent/peer.rb', line 145

def pending_recv; @want_blocks.find_all { |b| b.requested? }.length; end

#pending_sendObject



146
# File 'lib/rubytorrent/peer.rb', line 146

def pending_send; @peer_want_blocks.length; end

#piece_available?(index) ⇒ Boolean

Returns:

  • (Boolean)


210
# File 'lib/rubytorrent/peer.rb', line 210

def piece_available?(index); @peer_pieces[index]; end

#send_blocks_and_reqs(dllim = nil, ullim = nil) ⇒ Object

this is called both by input_thread_step and by the controller’s heartbeat thread. it sends as many pending blocks as it can while keeping the amount below ‘ullim’, and sends as many requests as it can while keeping the amount below ‘dllim’.

returns the number of bytes requested and sent



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/rubytorrent/peer.rb', line 249

def send_blocks_and_reqs(dllim=nil, ullim=nil)
  sent_bytes = 0
  reqd_bytes = 0

  @want_blocks_m.synchronize do
    @want_blocks.each do |b|
#        puts "[][] #{self}: #{b} is #{b.requested? ? 'requested' : 'NOT requested'} and has time_elapsed of #{b.requested? ? b.time_elapsed.round : 'n/a'}s"
      if b.requested? && (b.time_elapsed > REQUEST_TIMEOUT)
        rt_warning "#{self}: for block #{b}, time elapsed since request is #{b.time_elapsed} > #{REQUEST_TIMEOUT}, assuming peer forgot about it"
        @want_blocks.delete b
        @controller.forget_blocks [b]
      end
    end
  end

  ## send :requests
  unless @peer_choking || !@interested
    @want_blocks_m.synchronize do
      @want_blocks.each do |b|
        break if dllim && (reqd_bytes >= dllim)
        next if b.requested?
        
        if @package.pieces[b.pindex].complete?
          # not sure that this will ever happen, but...
          rt_warning "#{self}: deleting scheduled block for already-complete piece #{b}"
          @want_blocks.delete b
          next
        end

        queue_message(:request, {:index => b.pindex, :begin => b.begin,
                                 :length => b.length})
        reqd_bytes += b.length
        b.requested = true
        b.mark_time
        send_event(:requested_block, b)
      end
    end
  end

  ## send blocks
#    rt_debug "sending blocks. choking? #@choking, choked? #@peer_choking, ul rate #{ulrate}b/s, limit #@ulmeterlim" unless @peer_want_blocks.empty?
  unless @choking || !@peer_interested
    while !@peer_want_blocks.empty?
      break if ullim && (sent_bytes >= ullim)
      if (b = @peer_want_blocks.shift)
        sent_bytes += b.length
        @send_q.push b
        @time[:send_block] = Time.now
        send_event(:sent_block, b)
      end
    end
  end

  get_want_blocks

  [reqd_bytes, sent_bytes]
end

#send_keepaliveObject

Controller calls this to tell us to send a keepalive



238
239
240
241
# File 'lib/rubytorrent/peer.rb', line 238

def send_keepalive
#    rt_debug "* sending keepalive!"
  queue_message(:keepalive)
end

#shutdownObject



225
226
227
228
229
# File 'lib/rubytorrent/peer.rb', line 225

def shutdown
  rt_debug "#{self.to_s}: shutting down"
  @running = false
  @socket.close rescue nil
end

#snub=(now_snub) ⇒ Object

the Controller calls this from heartbeat thread to tell us whether to snub or not.



193
194
195
196
197
198
# File 'lib/rubytorrent/peer.rb', line 193

def snub=(now_snub)
  unless @snubbing = now_snub
    @snubbing = now_snub
    choke = true if @snubbing
  end
end

#startObject



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/rubytorrent/peer.rb', line 148

def start
  @running = true
  @time = {:start => Time.now}

  Thread.new do # start input thread
    begin
      while @running; input_thread_step; end
    rescue SystemCallError, IOError, ProtocolError => e
      rt_debug "#{self} (input): #{e.message}, releasing #{@want_blocks.length} claimed blocks and dying"
#        rt_debug e.backtrace.join("\n")
      @running = false
      @controller.forget_blocks @want_blocks
    end
  end

  Thread.new do # start output thread
    begin
      while @running; output_thread_step; end
    rescue SystemCallError, IOError, ProtocolError => e
      rt_debug "#{self} (output): #{e.message}, releasing #{@want_blocks.length} claimed blocks and dying"
#        rt_debug e.backtrace.join("\n")
      @running = false
      @controller.forget_blocks @want_blocks
    end
  end

  ## queue the initial messages
  queue_message(:bitfield, {:bitfield => @package.pieces.map { |p| p.complete? }.extend(ArrayToBitstring).to_bitstring})

  ## and that's it. if peer sends a bitfield, we'll send an
  ## interested and start requesting blocks at that point.  if they
  ## don't, it means they don't have any pieces, so we can just sit
  ## tight.
  self
end

#start_timeObject



205
# File 'lib/rubytorrent/peer.rb', line 205

def start_time; @time[:start]; end

#to_sObject



211
# File 'lib/rubytorrent/peer.rb', line 211

def to_s; "<peer: #@name>"; end

#ulamtObject



209
# File 'lib/rubytorrent/peer.rb', line 209

def ulamt; @ulmeter.amt; end

#ulrateObject



207
# File 'lib/rubytorrent/peer.rb', line 207

def ulrate; @ulmeter.rate; end