Class: PDTP::Server::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/pdtp/server/dispatcher.rb

Overview

Core dispatching and control logic for PDTP servers

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, file_service) ⇒ Dispatcher

Returns a new instance of Dispatcher.



34
35
36
37
38
39
40
# File 'lib/pdtp/server/dispatcher.rb', line 34

def initialize(server, file_service)
  @server = server
  @file_service = file_service
  @connections = []
  @used_client_ids = {} #keeps a list of client ids in use, they must be unique
  @updated_clients = {} #a set of clients that have been modified and need transfers spawned
end

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



32
33
34
# File 'lib/pdtp/server/dispatcher.rb', line 32

def connections
  @connections
end

Instance Method Details

#begin_transfer(taker, giver, url, chunkid) ⇒ Object

Creates a new transfer between two peers returns true on success, or false if the specified transfer is already in progress



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/pdtp/server/dispatcher.rb', line 106

def begin_transfer(taker, giver, url, chunkid)
  byte_range = @file_service.get_info(url).chunk_range(chunkid) 
  t = Transfer.new(taker, giver, url, chunkid, byte_range)

  #make sure this transfer doesnt already exist
  t1 = client_info(taker).transfers[t.transfer_id]
  t2 = client_info(giver).transfers[t.transfer_id]
  return false unless t1.nil? and t2.nil?

  client_info(taker).chunk_info.transfer(url, chunkid..chunkid) 
  client_info(taker).transfers[t.transfer_id] = t
  client_info(giver).transfers[t.transfer_id] = t

  #send transfer message to the connector
  addr, port = t.acceptor.get_peer_info

  t.connector.send_message(:transfer,
    :host => addr,
    :port => t.acceptor.user_data.listen_port,
    :method => t.connector == t.taker ? "get" : "put",
    :url => url,
    :range => byte_range,
    :peer_id => client_info(t.acceptor).client_id
  )
  true
end

#clear_all_stalled_transfersObject

this function removes all stalled transfers from the list and spawns new transfers as appropriate it must be called periodically by EventMachine



136
137
138
139
# File 'lib/pdtp/server/dispatcher.rb', line 136

def clear_all_stalled_transfers
  @connections.each { |connection| clear_stalled_transfers_for_client connection }  
  spawn_all_transfers
end

#clear_stalled_transfers_for_client(client_connection) ⇒ Object

removes all stalled transfers that this client is a part of



142
143
144
145
146
# File 'lib/pdtp/server/dispatcher.rb', line 142

def clear_stalled_transfers_for_client(client_connection)
  client_info(client_connection).get_stalled_transfers.each do |transfer|
    transfer_completed transfer, client_connection, nil, false
  end  
end

#client_info(connection) ⇒ Object

returns the ClientInfo object associated with this connection



67
68
69
# File 'lib/pdtp/server/dispatcher.rb', line 67

def client_info(connection)
  connection.user_data ||= ClientInfo.new
end

#connection_created(connection) ⇒ Object

called by pdtp_protocol when a connection is created



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/pdtp/server/dispatcher.rb', line 43

def connection_created(connection)
  addr, port = connection.get_peer_info
  
  if @server.file_service_enabled? and not @seen_file_service and addr == @server.addr
    #display file service greeting when we see it connect
    @server.log "file service running at #{addr}:#{@server.instance_eval { @http_port }}"
    @seen_file_service = true
    connection.mark_as_file_service
  else
    #display greeting for normal client
    @server.log "client connected: #{connection.get_peer_info.inspect}"
  end
  
  connection.user_data = ClientInfo.new
  @connections << connection 
end

#connection_destroyed(connection) ⇒ Object

called by pdtp_protocol when a connection is destroyed



61
62
63
64
# File 'lib/pdtp/server/dispatcher.rb', line 61

def connection_destroyed(connection)
  @server.log "client disconnected: #{connection.get_peer_info.inspect}"
  @connections.delete connection
end

#connection_name(c) ⇒ Object

returns a string representing the specified connection



314
315
316
317
318
# File 'lib/pdtp/server/dispatcher.rb', line 314

def connection_name(c)
  #host,port=c.get_peer_info
  #return "#{get_id(c)}: #{host}:#{port}"
  client_info(c).client_id
end

#dispatch_message(command, message, connection) ⇒ Object

handles all incoming messages from clients



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/pdtp/server/dispatcher.rb', line 246

def dispatch_message(command, message, connection)
  # store the command in the message hash
  message["type"] = command

  #require the client to register their client id and listen port before doing anything
  if command != "register" and client_info(connection).client_id.nil?
    raise ProtocolError.new("You need to send a 'client_info' message first")
  end 

  case command
  when "register"
    cid = message["client_id"]
    #make sure this id isnt in use
    if @used_client_ids[cid]
      raise ProtocolError.new("Your client id: #{cid} is already in use.")   
    end
  
    @used_client_ids[cid] = true 
    client_info(connection).listen_port = message["listen_port"]
    client_info(connection).client_id = cid
  when "ask_info"
    info = @file_service.get_info(message["url"])
    response = { :url => message["url"] }
    unless info.nil?
      response[:size] = info.file_size
      response[:chunk_size] = info.base_chunk_size
      response[:streaming] = info.streaming
    end
    connection.send_message :tell_info, response
  when "request", "provide", "unrequest", "unprovide"
    handle_requestprovide connection, message
  when "ask_verify"
    #check if the specified transfer is a real one
    my_id = client_info(connection).client_id
    transfer_id=Transfer.gen_transfer_id(my_id,message["peer_id"],message["url"],message["range"])
    ok = !!client_info(connection).transfers[transfer_id]
    client_info(connection).transfers[transfer_id].verification_asked=true if ok
    @server.debug "AskVerify not ok: id=#{transfer_id}" unless ok
    connection.send_message(:tell_verify,
      :url     => message["url"],
      :peer_id => message["peer_id"],
      :range   => message["range"],
      :peer    => message["peer"],
      :authorized=>ok
    )
  when "completed"
    my_id = client_info(connection).client_id
    transfer_id = Transfer::gen_transfer_id(
      my_id,
      message["peer_id"],
      message["url"],
      message["range"]
    )
    transfer=client_info(connection).transfers[transfer_id]
    @server.debug("Completed: id=#{transfer_id} ok=#{transfer != nil}" )
    if transfer
      transfer_completed(transfer,connection,message["hash"])
    else
      raise ProtocolWarn.new("You sent me a transfer completed message for unknown transfer: #{transfer_id}")
    end
  when 'protocol_error', 'protocol_warn' #ignore
  else raise ProtocolError.new("Unhandled message type: #{command}")
  end

  spawn_all_transfers
end

#handle_requestprovide(connection, message) ⇒ Object

handles the request, provide, unrequest, unprovide messages

Raises:



231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/pdtp/server/dispatcher.rb', line 231

def handle_requestprovide(connection,message)
  type=message["type"]
  url=message["url"]
  info=@file_service.get_info(url) rescue nil
  raise ProtocolWarn.new("Requested URL: '#{url}' not found") if info.nil?

  exclude_partial= (type=="provide") #only exclude partial chunks from provides
  range=info.chunk_range_from_byte_range(message["range"],exclude_partial)

  #call request, provide, unrequest, or unprovide
  client_info(connection).chunk_info.send( type.to_sym, url, range)
  @updated_clients[connection]=true #add to the list of client that need new transfers
end

#spawn_all_transfersObject

creates new transfers for all clients that have been updated



220
221
222
223
224
225
226
227
228
# File 'lib/pdtp/server/dispatcher.rb', line 220

def spawn_all_transfers
  while @updated_clients.size > 0 do
    tmp=@updated_clients
    @updated_clients=Hash.new
    tmp.each do |client,true_key| 
      spawn_transfers_for_client(client)
    end
  end    
end

#spawn_download_for_client(client_connection) ⇒ Object

creates a single download for the specified client returns true on success, false on failure



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/pdtp/server/dispatcher.rb', line 165

def spawn_download_for_client(client_connection)
  feasible_peers=[]

  c1info=client_info(client_connection)
  begin
    url,chunkid=c1info.chunk_info.high_priority_chunk
  rescue
    return false
  end

  @connections.each do |c2|
    next if client_connection==c2
    next if client_info(c2).wants_upload? == false
    if client_info(c2).chunk_info.provided?(url,chunkid)
      feasible_peers << c2
      break if feasible_peers.size > 5
    end
  end

  # we now have a list of clients that have the requested chunk.
  # pick one and start the transfer
  if feasible_peers.size > 0
    #FIXME base this on the trust model
    giver=feasible_peers[rand(feasible_peers.size)]
    return begin_transfer(client_connection,giver,url,chunkid)
    #FIXME should we try again if begin_transfer fails?
  end

  false
end

#spawn_transfers_for_client(client_connection) ⇒ Object

spawns uploads and downloads for this client. should be called every time there is a change that would affect what this client has or wants



151
152
153
154
155
156
157
158
159
160
161
# File 'lib/pdtp/server/dispatcher.rb', line 151

def spawn_transfers_for_client(client_connection)
  info = client_info client_connection

  while info.wants_download? do
    break if spawn_download_for_client(client_connection) == false
  end

  while info.wants_upload? do
    break if spawn_upload_for_client(client_connection) == false
  end
end

#spawn_upload_for_client(client_connection) ⇒ Object

creates a single upload for the specified client returns true on success, false on failure



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/pdtp/server/dispatcher.rb', line 198

def spawn_upload_for_client(client_connection)
  c1info=client_info(client_connection)

  @connections.each do |c2|
    next if client_connection==c2
    next if client_info(c2).wants_download? == false

    begin
      url,chunkid=client_info(c2).chunk_info.high_priority_chunk
    rescue
      next
    end

    if c1info.chunk_info.provided?(url,chunkid)
      return begin_transfer(c2,client_connection,url,chunkid)
    end
  end

  false
end

#transfer_completed(transfer, connection, chunk_hash, send_response = true) ⇒ Object

called when a transfer either finishes, successfully or not



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/pdtp/server/dispatcher.rb', line 72

def transfer_completed(transfer,connection,chunk_hash,send_response=true)      
  # did the transfer complete successfully?
  local_hash=@file_service.get_chunk_hash(transfer.url,transfer.chunkid)

  c1=client_info(transfer.taker)
  c2=client_info(transfer.giver)

  if connection==transfer.taker
    success= (chunk_hash==local_hash)

    if success
      #the taker now has the file, so he can provide it
      client_info(transfer.taker).chunk_info.provide(transfer.url,transfer.chunkid..transfer.chunkid)
      c1.trust.success(c2.trust)
    else
      #transfer failed, the client still wants the chunk
      client_info(transfer.taker).chunk_info.request(transfer.url,transfer.chunkid..transfer.chunkid)
      c1.trust.failure(c2.trust)
    end 

    transfer.taker.send_message(:hash_verify, 
      :url => transfer.url, 
      :range => transfer.byte_range, 
      :hash_ok => success
    ) if send_response
  end

  #remove this transfer from whoever sent it
  client_info(connection).transfers.delete(transfer.transfer_id)
  @updated_clients[connection]=true #flag this client for transfer creation
end