Class: PDTP::Server::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/pdtp/server/dispatcher.rb

Overview

Core dispatching and control logic for PDTP servers

Constant Summary collapse

CHUNK_STATES =
{
  'request'   => :requested,
  'provide'   => :provided,
  'unrequest' => :none,
  'unprovide' => :none,
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, file_service) ⇒ Dispatcher

Returns a new instance of Dispatcher.



32
33
34
35
36
37
38
# File 'lib/pdtp/server/dispatcher.rb', line 32

def initialize(server, file_service)
  @server = server
  @file_service = file_service
  @connections = []
  @used_client_ids = {} #keeps a list of client ids in use, they must be unique
  @transfer_manager = TransferManager.new @connections, @file_service
end

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



30
31
32
# File 'lib/pdtp/server/dispatcher.rb', line 30

def connections
  @connections
end

#serverObject (readonly)

Returns the value of attribute server.



30
31
32
# File 'lib/pdtp/server/dispatcher.rb', line 30

def server
  @server
end

Instance Method Details

#clear_all_stalled_transfersObject

This function removes all stalled transfers from the list and spawns new transfers as appropriate It must be called periodically by EventMachine



142
143
144
145
# File 'lib/pdtp/server/dispatcher.rb', line 142

def clear_all_stalled_transfers
  @connections.each { |connection| clear_stalled_transfers_for_client connection }  
  @transfer_manager.spawn_all_transfers
end

#connection_completed(connection) ⇒ Object

Register a PDTP::Server::Connection with the Dispatcher



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/pdtp/server/dispatcher.rb', line 41

def connection_completed(connection)
  addr, port = connection.get_peer_info
  
  # FIXME hacked file service registration.  There really ought to be a better way
  # to both register and authenticate file services
  if @server.file_service_enabled? and not @seen_file_service and addr == @server.addr
    # Display file service greeting when we see it connect
    @server.log "file service running at #{addr}:#{@server.instance_eval { @http_port }}"
    @seen_file_service = true
    
    # Extend connection with file service-specific method implementations
    connection.extend FileServiceConnection
  elsif not @seen_file_service
    raise RuntimeError, "File service failed to initialize.  Ensure listen address is correct."
  else
    # Display greeting for normal client
    @server.log "client connected: #{connection.get_peer_info.inspect}"
  end
  
  @connections << connection 
end

#connection_destroyed(connection) ⇒ Object

Unregister a PDTP::Server::Connection from the Dispatcher



64
65
66
67
# File 'lib/pdtp/server/dispatcher.rb', line 64

def connection_destroyed(connection)
  @server.log "client disconnected: #{connection.get_peer_info.inspect}"
  @connections.delete connection
end

#receive_message(command, message, connection) ⇒ Object

Handles all incoming messages from clients



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/pdtp/server/dispatcher.rb', line 70

def receive_message(command, message, connection)
  # Store the command in the message hash
  message["type"] = command

  # Require the client to register their client id and listen port before doing anything
  if command != "register" and connection.client_id.nil?
    raise ProtocolError, "You need to send a 'register' message first"
  end 

  case command
  when "register"
    cid = message["client_id"]
    #make sure this id isnt in use
    if @used_client_ids[cid]
      raise ProtocolError, "Your client id: #{cid} is already in use."
    end
  
    @used_client_ids[cid] = true 
    connection.listen_port = message["listen_port"]
    connection.client_id = cid
  when "ask_info"
    info = @file_service.get_info(message["url"])
    response = { :url => message["url"] }
    unless info.nil?
      response[:size] = info.file_size
      response[:chunk_size] = info.base_chunk_size
      response[:streaming] = info.streaming
    end
    connection.send_message :tell_info, response
  when "request", "provide", "unrequest", "unprovide"
    handle_requestprovide connection, command, message
  when "ask_verify"
    #check if the specified transfer is a real one
    my_id = connection.client_id
    transfer_id = Transfer.gen_transfer_id(my_id,message["peer_id"],message["url"],message["range"])
    authorized = !connection.transfers[transfer_id].nil?
    
    connection.transfers[transfer_id].verification_asked = true if authorized
    @server.debug "AskVerify not ok: id=#{transfer_id}" unless authorized
    connection.send_message(:tell_verify,
      :url     => message["url"],
      :peer_id => message["peer_id"],
      :range   => message["range"],
      :peer    => message["peer"],
      :authorized=>authorized
    )
  when "completed"
    my_id = connection.client_id
    transfer_id = Transfer::gen_transfer_id(
      my_id,
      message["peer_id"],
      message["url"],
      message["range"]
    )
    transfer = connection.transfers[transfer_id]
    @server.debug "Completed: id=#{transfer_id} ok=#{transfer != nil}"
    if transfer
      transfer_completed transfer, connection, message["hash"]
    else
      raise ProtocolWarn, "You sent me a transfer completed message for unknown transfer: #{transfer_id}"
    end
  when 'protocol_error', 'protocol_warn' #ignore
  else raise ProtocolError, "Unhandled message type: #{command}"
  end

  # Process all clients that are in need of new transfers
  @transfer_manager.spawn_all_transfers
end