Class: PDTP::Server::Dispatcher
- Inherits:
-
Object
- Object
- PDTP::Server::Dispatcher
- Defined in:
- lib/pdtp/server/dispatcher.rb
Overview
Core dispatching and control logic for PDTP servers
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Returns the value of attribute connections.
Instance Method Summary collapse
-
#begin_transfer(taker, giver, url, chunkid) ⇒ Object
Creates a new transfer between two peers returns true on success, or false if the specified transfer is already in progress.
-
#clear_all_stalled_transfers ⇒ Object
this function removes all stalled transfers from the list and spawns new transfers as appropriate it must be called periodically by EventMachine.
-
#clear_stalled_transfers_for_client(client_connection) ⇒ Object
removes all stalled transfers that this client is a part of.
-
#client_info(connection) ⇒ Object
returns the ClientInfo object associated with this connection.
-
#connection_created(connection) ⇒ Object
called by pdtp_protocol when a connection is created.
-
#connection_destroyed(connection) ⇒ Object
called by pdtp_protocol when a connection is destroyed.
-
#connection_name(c) ⇒ Object
returns a string representing the specified connection.
-
#dispatch_message(command, message, connection) ⇒ Object
handles all incoming messages from clients.
-
#handle_requestprovide(connection, message) ⇒ Object
handles the request, provide, unrequest, unprovide messages.
-
#initialize(server, file_service) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
-
#spawn_all_transfers ⇒ Object
creates new transfers for all clients that have been updated.
-
#spawn_download_for_client(client_connection) ⇒ Object
creates a single download for the specified client returns true on success, false on failure.
-
#spawn_transfers_for_client(client_connection) ⇒ Object
spawns uploads and downloads for this client.
-
#spawn_upload_for_client(client_connection) ⇒ Object
creates a single upload for the specified client returns true on success, false on failure.
-
#transfer_completed(transfer, connection, chunk_hash, send_response = true) ⇒ Object
called when a transfer either finishes, successfully or not.
Constructor Details
#initialize(server, file_service) ⇒ Dispatcher
Returns a new instance of Dispatcher.
34 35 36 37 38 39 40 |
# File 'lib/pdtp/server/dispatcher.rb', line 34 def initialize(server, file_service) @server = server @file_service = file_service @connections = [] @used_client_ids = {} #keeps a list of client ids in use, they must be unique @updated_clients = {} #a set of clients that have been modified and need transfers spawned end |
Instance Attribute Details
#connections ⇒ Object (readonly)
Returns the value of attribute connections.
32 33 34 |
# File 'lib/pdtp/server/dispatcher.rb', line 32 def connections @connections end |
Instance Method Details
#begin_transfer(taker, giver, url, chunkid) ⇒ Object
Creates a new transfer between two peers returns true on success, or false if the specified transfer is already in progress
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/pdtp/server/dispatcher.rb', line 106 def begin_transfer(taker, giver, url, chunkid) byte_range = @file_service.get_info(url).chunk_range(chunkid) t = Transfer.new(taker, giver, url, chunkid, byte_range) #make sure this transfer doesnt already exist t1 = client_info(taker).transfers[t.transfer_id] t2 = client_info(giver).transfers[t.transfer_id] return false unless t1.nil? and t2.nil? client_info(taker).chunk_info.transfer(url, chunkid..chunkid) client_info(taker).transfers[t.transfer_id] = t client_info(giver).transfers[t.transfer_id] = t #send transfer message to the connector addr, port = t.acceptor.get_peer_info t.connector.(:transfer, :host => addr, :port => t.acceptor.user_data.listen_port, :method => t.connector == t.taker ? "get" : "put", :url => url, :range => byte_range, :peer_id => client_info(t.acceptor).client_id ) true end |
#clear_all_stalled_transfers ⇒ Object
this function removes all stalled transfers from the list and spawns new transfers as appropriate it must be called periodically by EventMachine
136 137 138 139 |
# File 'lib/pdtp/server/dispatcher.rb', line 136 def clear_all_stalled_transfers @connections.each { |connection| clear_stalled_transfers_for_client connection } spawn_all_transfers end |
#clear_stalled_transfers_for_client(client_connection) ⇒ Object
removes all stalled transfers that this client is a part of
142 143 144 145 146 |
# File 'lib/pdtp/server/dispatcher.rb', line 142 def clear_stalled_transfers_for_client(client_connection) client_info(client_connection).get_stalled_transfers.each do |transfer| transfer_completed transfer, client_connection, nil, false end end |
#client_info(connection) ⇒ Object
returns the ClientInfo object associated with this connection
67 68 69 |
# File 'lib/pdtp/server/dispatcher.rb', line 67 def client_info(connection) connection.user_data ||= ClientInfo.new end |
#connection_created(connection) ⇒ Object
called by pdtp_protocol when a connection is created
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/pdtp/server/dispatcher.rb', line 43 def connection_created(connection) addr, port = connection.get_peer_info if @server.file_service_enabled? and not @seen_file_service and addr == @server.addr #display file service greeting when we see it connect @server.log "file service running at #{addr}:#{@server.instance_eval { @http_port }}" @seen_file_service = true connection.mark_as_file_service else #display greeting for normal client @server.log "client connected: #{connection.get_peer_info.inspect}" end connection.user_data = ClientInfo.new @connections << connection end |
#connection_destroyed(connection) ⇒ Object
called by pdtp_protocol when a connection is destroyed
61 62 63 64 |
# File 'lib/pdtp/server/dispatcher.rb', line 61 def connection_destroyed(connection) @server.log "client disconnected: #{connection.get_peer_info.inspect}" @connections.delete connection end |
#connection_name(c) ⇒ Object
returns a string representing the specified connection
314 315 316 317 318 |
# File 'lib/pdtp/server/dispatcher.rb', line 314 def connection_name(c) #host,port=c.get_peer_info #return "#{get_id(c)}: #{host}:#{port}" client_info(c).client_id end |
#dispatch_message(command, message, connection) ⇒ Object
handles all incoming messages from clients
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/pdtp/server/dispatcher.rb', line 246 def (command, , connection) # store the command in the message hash ["type"] = command #require the client to register their client id and listen port before doing anything if command != "register" and client_info(connection).client_id.nil? raise ProtocolError.new("You need to send a 'client_info' message first") end case command when "register" cid = ["client_id"] #make sure this id isnt in use if @used_client_ids[cid] raise ProtocolError.new("Your client id: #{cid} is already in use.") end @used_client_ids[cid] = true client_info(connection).listen_port = ["listen_port"] client_info(connection).client_id = cid when "ask_info" info = @file_service.get_info(["url"]) response = { :url => ["url"] } unless info.nil? response[:size] = info.file_size response[:chunk_size] = info.base_chunk_size response[:streaming] = info.streaming end connection. :tell_info, response when "request", "provide", "unrequest", "unprovide" handle_requestprovide connection, when "ask_verify" #check if the specified transfer is a real one my_id = client_info(connection).client_id transfer_id=Transfer.gen_transfer_id(my_id,["peer_id"],["url"],["range"]) ok = !!client_info(connection).transfers[transfer_id] client_info(connection).transfers[transfer_id].verification_asked=true if ok @server.debug "AskVerify not ok: id=#{transfer_id}" unless ok connection.(:tell_verify, :url => ["url"], :peer_id => ["peer_id"], :range => ["range"], :peer => ["peer"], :authorized=>ok ) when "completed" my_id = client_info(connection).client_id transfer_id = Transfer::gen_transfer_id( my_id, ["peer_id"], ["url"], ["range"] ) transfer=client_info(connection).transfers[transfer_id] @server.debug("Completed: id=#{transfer_id} ok=#{transfer != nil}" ) if transfer transfer_completed(transfer,connection,["hash"]) else raise ProtocolWarn.new("You sent me a transfer completed message for unknown transfer: #{transfer_id}") end when 'protocol_error', 'protocol_warn' #ignore else raise ProtocolError.new("Unhandled message type: #{command}") end spawn_all_transfers end |
#handle_requestprovide(connection, message) ⇒ Object
handles the request, provide, unrequest, unprovide messages
231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/pdtp/server/dispatcher.rb', line 231 def handle_requestprovide(connection,) type=["type"] url=["url"] info=@file_service.get_info(url) rescue nil raise ProtocolWarn.new("Requested URL: '#{url}' not found") if info.nil? exclude_partial= (type=="provide") #only exclude partial chunks from provides range=info.chunk_range_from_byte_range(["range"],exclude_partial) #call request, provide, unrequest, or unprovide client_info(connection).chunk_info.send( type.to_sym, url, range) @updated_clients[connection]=true #add to the list of client that need new transfers end |
#spawn_all_transfers ⇒ Object
creates new transfers for all clients that have been updated
220 221 222 223 224 225 226 227 228 |
# File 'lib/pdtp/server/dispatcher.rb', line 220 def spawn_all_transfers while @updated_clients.size > 0 do tmp=@updated_clients @updated_clients=Hash.new tmp.each do |client,true_key| spawn_transfers_for_client(client) end end end |
#spawn_download_for_client(client_connection) ⇒ Object
creates a single download for the specified client returns true on success, false on failure
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/pdtp/server/dispatcher.rb', line 165 def spawn_download_for_client(client_connection) feasible_peers=[] c1info=client_info(client_connection) begin url,chunkid=c1info.chunk_info.high_priority_chunk rescue return false end @connections.each do |c2| next if client_connection==c2 next if client_info(c2).wants_upload? == false if client_info(c2).chunk_info.provided?(url,chunkid) feasible_peers << c2 break if feasible_peers.size > 5 end end # we now have a list of clients that have the requested chunk. # pick one and start the transfer if feasible_peers.size > 0 #FIXME base this on the trust model giver=feasible_peers[rand(feasible_peers.size)] return begin_transfer(client_connection,giver,url,chunkid) #FIXME should we try again if begin_transfer fails? end false end |
#spawn_transfers_for_client(client_connection) ⇒ Object
spawns uploads and downloads for this client. should be called every time there is a change that would affect what this client has or wants
151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/pdtp/server/dispatcher.rb', line 151 def spawn_transfers_for_client(client_connection) info = client_info client_connection while info.wants_download? do break if spawn_download_for_client(client_connection) == false end while info.wants_upload? do break if spawn_upload_for_client(client_connection) == false end end |
#spawn_upload_for_client(client_connection) ⇒ Object
creates a single upload for the specified client returns true on success, false on failure
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/pdtp/server/dispatcher.rb', line 198 def spawn_upload_for_client(client_connection) c1info=client_info(client_connection) @connections.each do |c2| next if client_connection==c2 next if client_info(c2).wants_download? == false begin url,chunkid=client_info(c2).chunk_info.high_priority_chunk rescue next end if c1info.chunk_info.provided?(url,chunkid) return begin_transfer(c2,client_connection,url,chunkid) end end false end |
#transfer_completed(transfer, connection, chunk_hash, send_response = true) ⇒ Object
called when a transfer either finishes, successfully or not
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/pdtp/server/dispatcher.rb', line 72 def transfer_completed(transfer,connection,chunk_hash,send_response=true) # did the transfer complete successfully? local_hash=@file_service.get_chunk_hash(transfer.url,transfer.chunkid) c1=client_info(transfer.taker) c2=client_info(transfer.giver) if connection==transfer.taker success= (chunk_hash==local_hash) if success #the taker now has the file, so he can provide it client_info(transfer.taker).chunk_info.provide(transfer.url,transfer.chunkid..transfer.chunkid) c1.trust.success(c2.trust) else #transfer failed, the client still wants the chunk client_info(transfer.taker).chunk_info.request(transfer.url,transfer.chunkid..transfer.chunkid) c1.trust.failure(c2.trust) end transfer.taker.(:hash_verify, :url => transfer.url, :range => transfer.byte_range, :hash_ok => success ) if send_response end #remove this transfer from whoever sent it client_info(connection).transfers.delete(transfer.transfer_id) @updated_clients[connection]=true #flag this client for transfer creation end |