Class: RubyTorrent::Controller

Inherits:
Object
  • Object
show all
Extended by:
AttrReaderQ, MinIntervalMethods
Includes:
EventSource
Defined in:
lib/rubytorrent/controller.rb

Overview

The Controller manages all PeerConnections for a single Package. It instructs them to request blocks, and tells them whether to choke their connections or not. It also reports progress to the tracker.

Incoming, post-handshake peer connections are added by the Server via calling add_connection; deciding to accept these is the Controller’s responsibility, as is connecting to any new peers.

Constant Summary collapse

HEARTBEAT =

general behavior parameters

5
MAX_PEERS =

seconds between iterations of the heartbeat

15
ENDGAME_PIECE_THRESH =

hard limit on the number of peers

5
FUSEKI_PIECE_THRESH =

(wild guess) number of pieces remaining before we trigger end-game mode

2
SPAWN_NEW_PEER_THRESH =

number of pieces we must have before getting out of fuseki mode. in fuseki (“opening”, if you’re not a weiqi/go fan) mode, rather than ranking pieces by rarity, we rank them by how distant their popularity is from (# peers) / 2, and we’re also stingly in handing out requests.

0.75
RATE_WINDOW =

portion of the download rate above which we’ll stop making new peer connections

20
DEAD_TRACKER_INITIAL_INTERVAL =

tracker parameters. when we can’t access a tracker, we retry at DEAD_TRACKER_INITIAL_DELAY seconds and double that after every failure, capping at DEAD_TRACKER_MAX_DELAY.

5
DEAD_TRACKER_MAX_INTERVAL =
3600
KEEPALIVE_INTERVAL =

single peer parameters

120
SILENT_DEATH_INTERVAL =

seconds of silence before sending a keepalive

240
BOREDOM_DEATH_INTERVAL =

seconds of silence before we drop a peer

120
BLOCK_SIZE =

seconds of existence with no downloaded data at which we drop a peer in favor of an incoming peer (unless the package is complete)

2**15
ANTISNUB_RATE_THRESH =

antisnubbing

1024
ANTISNUB_INTERVAL =

if the total bytes/second across all peers falls below this threshold, we trigger anti-snubbing mode

60
NUM_FRIENDS =

choking and optimistic unchoking parameters

4
CALC_FRIENDS_INTERVAL =

number of peers unchoked due to high download rates

10
CALC_OPTUNCHOKES_INTERVAL =

seconds between recalculating choked status for each peer

30
NUM_OPTUNCHOKES =

seconds between reassigning optimistic unchoked status

1
NEW_OPTUNCHOKE_PROB =

number of optimistic unchoke slots (not including any temporary ones generated in anti-snubbing mode.

0.5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from MinIntervalMethods

min_interval

Methods included from AttrReaderQ

attr_accessor_q, attr_reader_q, attr_writer_q

Methods included from EventSource

append_features, #on_event, #relay_event, #send_event, #unregister_events

Constructor Details

#initialize(server, package, info_hash, trackers, dlratelim = nil, ulratelim = nil, http_proxy = ) ⇒ Controller

Returns a new instance of Controller.



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/rubytorrent/controller.rb', line 177

def initialize(server, package, info_hash, trackers, dlratelim=nil, ulratelim=nil, http_proxy=ENV["http_proxy"])
  @server = server
  @info_hash = info_hash
  @package = package
  @trackers = trackers
  @http_proxy = http_proxy

  @dlratelim = dlratelim
  @ulratelim = ulratelim

  @peers = [].extend(ArrayShuffle)
  @peers_m = Mutex.new
  @thread = nil

  @tracker = nil
  @last_tracker_attempt = nil
  @tracker_delay = DEAD_TRACKER_INITIAL_INTERVAL

  ## friends
  @num_friends = 0
  @num_optunchokes = 0
  @num_snubbed = 0

  ## keep track of the popularity of the pieces so as to assign
  ## blocks optimally to peers.
  @piece_order = PieceOrder.new @package

  @running = false
end

Instance Attribute Details

#dlratelimObject

their connection, and optimistic unchoking slots are given with probability p*(1-p)^r, where r is the rank and p is this number.



170
171
172
# File 'lib/rubytorrent/controller.rb', line 170

def dlratelim
  @dlratelim
end

#http_proxyObject

their connection, and optimistic unchoking slots are given with probability p*(1-p)^r, where r is the rank and p is this number.



170
171
172
# File 'lib/rubytorrent/controller.rb', line 170

def http_proxy
  @http_proxy
end

#info_hashObject

their connection, and optimistic unchoking slots are given with probability p*(1-p)^r, where r is the rank and p is this number.



170
171
172
# File 'lib/rubytorrent/controller.rb', line 170

def info_hash
  @info_hash
end

#packageObject

their connection, and optimistic unchoking slots are given with probability p*(1-p)^r, where r is the rank and p is this number.



170
171
172
# File 'lib/rubytorrent/controller.rb', line 170

def package
  @package
end

#trackerObject

their connection, and optimistic unchoking slots are given with probability p*(1-p)^r, where r is the rank and p is this number.



170
171
172
# File 'lib/rubytorrent/controller.rb', line 170

def tracker
  @tracker
end

#ulratelimObject

their connection, and optimistic unchoking slots are given with probability p*(1-p)^r, where r is the rank and p is this number.



170
171
172
# File 'lib/rubytorrent/controller.rb', line 170

def ulratelim
  @ulratelim
end

Instance Method Details

#add_peer(p) ⇒ Object

this could be called at any point by the Server, if it receives incoming peer connections.



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/rubytorrent/controller.rb', line 248

def add_peer(p)
  accept = true

  if @peers.length >= MAX_PEERS && !@package.complete?
    oldp = @peers.find { |x| !x.running? || ((x.dlamt == 0) && ((Time.now - x.start_time) > BOREDOM_DEATH_INTERVAL)) }

    if oldp
      rt_debug "killing peer for being boring: #{oldp}" 
      oldp.shutdown
    else
      rt_debug "too many peers, ignoring #{p}"
      p.shutdown
      accept = false
    end
  end

  if accept
    p.on_event(self, :received_block) { |peer, block| received_block(block, peer) }
    p.on_event(self, :peer_has_piece) { |peer, piece| peer_has_piece(piece, peer) }
    p.on_event(self, :peer_has_pieces) { |peer, bitfield| peer_has_pieces(bitfield, peer) }
    p.on_event(self, :sent_block) { |peer, block| send_event(:sent_block, block, peer.name) }
    p.on_event(self, :requested_block) { |peer, block| send_event(:requested_block, block, peer.name) }

    @peers_m.synchronize do
      @peers.push p
      ## it's important not to call p.start (which triggers the
      ## bitfield message) until it's been added to @peer, such that
      ## any :have messages that might happen from other peers in
      ## the mean time are propagated to it.
      ##
      ## of course that means we need to call p.start within the
      ## mutex context so that the reaper section of the heartbeat
      ## doesn't kill it between push and start.
      ##
      ## ah, the joys of threaded programming.
      p.start if @running
    end

    send_event(:added_peer, p.name)
  end
end

#claim_blocksObject

yield all desired blocks, in order of desire. called by peers to refill their queues.



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/rubytorrent/controller.rb', line 319

def claim_blocks
  @piece_order.each(@in_fuseki, @peers.length) do |i|
    p = @package.pieces[i]
    next if p.complete?
#      rt_debug "+ considering piece #{p}"
    if @in_endgame
      p.each_empty_block(BLOCK_SIZE) { |b| yield b }
    else
      p.each_unclaimed_block(BLOCK_SIZE) do |b|
        if yield b
          p.claim_block b
          return if @in_fuseki # fuseki shortcut
        end
      end
    end
  end
end

#dlamtObject



209
# File 'lib/rubytorrent/controller.rb', line 209

def dlamt; @peers.inject(0) { |s, p| s + p.dlamt }; end

#dlrateObject



207
# File 'lib/rubytorrent/controller.rb', line 207

def dlrate; @peers.inject(0) { |s, p| s + p.dlrate }; end

#forget_blocks(blocks) ⇒ Object



337
338
339
340
# File 'lib/rubytorrent/controller.rb', line 337

def forget_blocks(blocks)
#    rt_debug "#{self}: forgetting blocks #{blocks.join(', ')}"
  blocks.each { |b| @package.pieces[b.pindex].unclaim_block b }
end

#num_peersObject



211
# File 'lib/rubytorrent/controller.rb', line 211

def num_peers; @peers.length; end

#peer_has_piece(piece, peer) ⇒ Object



309
310
311
# File 'lib/rubytorrent/controller.rb', line 309

def peer_has_piece(piece, peer)
  @piece_order.inc piece.index
end

#peer_has_pieces(bitfield, peer) ⇒ Object



313
314
315
# File 'lib/rubytorrent/controller.rb', line 313

def peer_has_pieces(bitfield, peer)
  @piece_order.inc_all bitfield
end

#peer_infoObject



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
# File 'lib/rubytorrent/controller.rb', line 342

def peer_info
  @peers.map do |p|
    next nil unless p.running?
    {:name => p.name, :seed => p.peer_complete?,
     :dlamt => p.dlamt, :ulamt => p.ulamt,
     :dlrate => p.dlrate, :ulrate => p.ulrate,
     :pending_send => p.pending_send, :pending_recv => p.pending_recv,
     :interested => p.interested?, :peer_interested => p.peer_interested?,
     :choking => p.choking?, :peer_choking => p.peer_choking?,
     :snubbing => p.snubbing?,
     :we_desire => @package.pieces.inject(0) do |s, piece|
        s + (!piece.complete? && p.piece_available?(piece.index) ? 1 : 0)
      end,
     :they_desire => @package.pieces.inject(0) do |s, piece|
        s + (piece.complete? && !p.piece_available?(piece.index) ? 1 : 0)
      end,
     :start_time => p.start_time
    }
  end.compact
end

#received_block(block, peer) ⇒ Object



290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/rubytorrent/controller.rb', line 290

def received_block(block, peer)
  if @in_endgame
    @peers_m.synchronize { @peers.each { |p| p.cancel block if p.running? && (p != peer)} }
  end
  send_event(:received_block, block, peer.name)

  piece = @package.pieces[block.pindex] # find corresponding piece
  if piece.complete?
    if piece.valid?
      @peers_m.synchronize { @peers.each { |peer| peer.have_piece piece } }
      send_event(:have_piece, piece)
    else
      rt_warning "#{self}: received data for #{piece} does not match SHA1 hash, discarding"
      send_event(:discarded_piece, piece)
      piece.discard
    end
  end
end

#shutdownObject



234
235
236
237
238
239
240
# File 'lib/rubytorrent/controller.rb', line 234

def shutdown
  @running = false
  @tracker.stopped unless @tracker.nil? rescue TrackerError
  @thread.join(0.2)
  @peers.each { |c| c.shutdown }
  self
end

#startObject



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/rubytorrent/controller.rb', line 213

def start
  raise "already" if @running

  find_tracker

  @in_endgame = false
  @in_antisnub = false
  @in_fuseki = false
  @running = true
  @thread = Thread.new do
    while @running
      step
      sleep HEARTBEAT
    end
  end

  @peers.each { |p| p.start unless p.running? }

  self
end

#to_sObject



242
243
244
# File 'lib/rubytorrent/controller.rb', line 242

def to_s
  "<#{self.class}: package #{@package}>"
end

#ulamtObject



210
# File 'lib/rubytorrent/controller.rb', line 210

def ulamt; @peers.inject(0) { |s, p| s + p.ulamt }; end

#ulrateObject



208
# File 'lib/rubytorrent/controller.rb', line 208

def ulrate; @peers.inject(0) { |s, p| s + p.ulrate }; end