Class: PDTP::Server::Connection

Inherits:
Protocol show all
Defined in:
lib/pdtp/server/connection.rb

Overview

Server’s internal representation of a client connection

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Protocol

#connection_open?, define_message_params, #error_close_connection, #get_peer_info, #hash_to_range, obj_matches_type?, #post_init, print_info, #range_to_hash, #receive_packet, #remote_peer_id, #send_message, #to_s, #unbind, validate_message

Methods inherited from LengthPrefixProtocol

#prefix_size=, #receive_data, #receive_packet, #send_packet

Constructor Details

#initialize(*args) ⇒ Connection

Returns a new instance of Connection.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/pdtp/server/connection.rb', line 42

def initialize(*args)
  # Information about what chunks the client is requesting/providing
  @chunk_info = ChunkInfo.new
  
  # Chunk transfers the client is actively participating in
  @transfers = Hash.new
  
  # Port the client is listening on        
  @listen_port = 6000 #default
  
  # Trust relatipnships with other peers
  @trust = Trust.new
  
  # Bandwidth estimators
  @upstream = BandwidthEstimator.new
  @downstream = BandwidthEstimator.new
  
  super
end

Instance Attribute Details

#chunk_infoObject

Accessors which can hopefully be eliminated in future versions



38
39
40
# File 'lib/pdtp/server/connection.rb', line 38

def chunk_info
  @chunk_info
end

#client_idObject

Returns the value of attribute client_id.



39
40
41
# File 'lib/pdtp/server/connection.rb', line 39

def client_id
  @client_id
end

#dispatcher=(value) ⇒ Object (writeonly)

Handle to the dispatcher which can hopefully be eliminated in future versions of EventMachine



35
36
37
# File 'lib/pdtp/server/connection.rb', line 35

def dispatcher=(value)
  @dispatcher = value
end

#listen_portObject

Returns the value of attribute listen_port.



39
40
41
# File 'lib/pdtp/server/connection.rb', line 39

def listen_port
  @listen_port
end

#transfersObject

Returns the value of attribute transfers.



40
41
42
# File 'lib/pdtp/server/connection.rb', line 40

def transfers
  @transfers
end

#trustObject

Accessors which can hopefully be eliminated in future versions



38
39
40
# File 'lib/pdtp/server/connection.rb', line 38

def trust
  @trust
end

Instance Method Details

#downloadsObject

List of all active downloads



143
144
145
# File 'lib/pdtp/server/connection.rb', line 143

def downloads
  @transfers.select { |_, t| t.taker == self and t.verification_asked }
end

#downstream_bandwidthObject

Estimate of a client’s downstream bandwidth



122
123
124
# File 'lib/pdtp/server/connection.rb', line 122

def downstream_bandwidth
  @downstream.estimate rescue nil
end

#empty_transfer_slots?Boolean

Are we below the limit on half-open transfer slots?

Returns:

  • (Boolean)


138
139
140
# File 'lib/pdtp/server/connection.rb', line 138

def empty_transfer_slots?
  @transfers.select { |_, t| not t.verification_asked }.size < max_half_open
end

#failure(transfer) ⇒ Object

Log a failed transfer and update internal data

Raises:

  • (ArgumentError)


110
111
112
113
114
# File 'lib/pdtp/server/connection.rb', line 110

def failure(transfer)
  raise ArgumentError, "not taker for this transfer" unless transfer.taker == self
  @chunk_info.request transfer.url, transfer.chunkid..transfer.chunkid
  @trust.failure transfer.giver.trust
end

#file_service?Boolean

Is this connection the file service?

Returns:

  • (Boolean)


185
186
187
# File 'lib/pdtp/server/connection.rb', line 185

def file_service?
  false
end

#max_concurrent_downloadsObject Also known as: max_concurrent_uploads

Number of concurrent downloads desired desired FIXME hardcoded, should probably be computed or client-specified



128
# File 'lib/pdtp/server/connection.rb', line 128

def max_concurrent_downloads; 8; end

#max_half_openObject

Maximum number of “half open” transfers allowed FIXME hardcoded, should probably be computed



135
# File 'lib/pdtp/server/connection.rb', line 135

def max_half_open; 8; end

#prefix(mask = 24) ⇒ Object

Address prefix of this peer

Raises:

  • (ArgumentError)


63
64
65
66
67
68
69
# File 'lib/pdtp/server/connection.rb', line 63

def prefix(mask = 24)
  raise ArgumentError, "mask must be 8, 16, or 24" unless [8, 16, 24].include?(mask)
  addr, _ = @cached_peer_info
  
  octets = mask / 8
  addr.split('.')[0..(octets - 1)].join('.')
end

#score(peer) ⇒ Object

Calculate a score from 0 to 10 for two clients being a good match



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/pdtp/server/connection.rb', line 72

def score(peer)
  s = 0.0
  
  # One point for each matching prefix
  [8, 16, 24].each { |mask| s += 1 if prefix(mask) == peer.prefix(mask) }
  
  # Up to three points for having ample bandwidth compared to us
  [0.5, 1, 2].each { |divisor| s += 1 if peer.upstream_bandwidth > downstream_bandwidth / divisor }
  
  # 0 - 4 points for trust
  s += 4 * @trust.weight(peer.trust)
  
  #puts "#{peer} trust: #{@trust.weight(peer.trust)}"
  
  # Deprioritize file service by cutting its score in half
  s /= 2 if peer.file_service?
  
  s
end

#stalled_transfersObject

Returns a list of all the stalled transfers this client is a part of



170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/pdtp/server/connection.rb', line 170

def stalled_transfers
  timeout = 20.0
  now = Time.now
  
  @transfers.inject([]) do |stalled, (_, t)|
    # only delete if we are the acceptor to prevent race conditions
    unless t.acceptor == self 
      stalled << t if now - t.creation_time > timeout and not t.verification_asked
    end
    
    stalled
  end
end

#success(transfer) ⇒ Object

Log a completed transfer and update internal data



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/pdtp/server/connection.rb', line 93

def success(transfer)
  if transfer.taker == self
    @chunk_info.set :provided, transfer.url, transfer.chunkid..transfer.chunkid
    @trust.success transfer.giver.trust
    bandwidth_estimator = @downstream
  else
    bandwidth_estimator = @upstream
  end
  
  bandwidth_estimator.log(
    transfer.creation_time, 
    Time.now, 
    transfer.byte_range.end - transfer.byte_range.begin
  )
end

#uploadsObject

List of all active uplaods



148
149
150
# File 'lib/pdtp/server/connection.rb', line 148

def uploads
  @transfers.select { |_, t| t.giver == self and t.verification_asked }
end

#upstream_bandwidthObject

Estimate of a client’s upstream bandwidth



117
118
119
# File 'lib/pdtp/server/connection.rb', line 117

def upstream_bandwidth
  @upstream.estimate rescue nil
end

#wants_download?Boolean

Returns true if this client wants the server to spawn a download for it

Returns:

  • (Boolean)


153
154
155
156
157
158
159
# File 'lib/pdtp/server/connection.rb', line 153

def wants_download?
  return false unless @chunk_info.high_priority_chunk
  return false unless empty_transfer_slots?
          
  # Are we at our concurrent download limit?
  downloads.size < max_concurrent_downloads
end

#wants_upload?Boolean

Returns true if this client wants the server to spawn an uplaod for it

Returns:

  • (Boolean)


162
163
164
165
166
167
# File 'lib/pdtp/server/connection.rb', line 162

def wants_upload?
  return false unless empty_transfer_slots?
  
  # Are we at our concurrent upload limit?
  uploads.size < max_concurrent_uploads
end