Class: PDTP::Protocol

Inherits:
LengthPrefixProtocol show all
Defined in:
lib/pdtp/common/protocol.rb

Overview

EventMachine handler class for the PDTP protocol

Direct Known Subclasses

Client::Connection, Server::Connection

Defined Under Namespace

Classes: Optional

Constant Summary collapse

@@num_connections =
0
@@message_params =
nil

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from LengthPrefixProtocol

#initialize, #prefix_size=, #receive_data, #send_packet

Constructor Details

This class inherits a constructor from PDTP::LengthPrefixProtocol

Class Method Details

.define_message_paramsObject

this function defines the required fields for each message



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/pdtp/common/protocol.rb', line 238

def self.define_message_params
  mp = {}

  #must be the first message the client sends
  mp["register"]={
    "client_id"=>:string,
    "listen_port"=>:int                  
  }

  mp["ask_info"]={
    "url"=>:url
  }

  mp["tell_info"]={
    "url"=>:url,
    "size"=>Optional.new(:int),
    "chunk_size"=>Optional.new(:int),
    "streaming"=>Optional.new(:bool)
  }

  mp["ask_verify"]={
    "peer"=>:ip,
    "url"=>:url,
    "range"=>:range,
    "peer_id"=>:string
  }

  mp["tell_verify"]={
    "peer"=>:ip,
    "url"=>:url,
    "range"=>:range,
    "peer_id"=>:string,
    "authorized"=>:bool
  }

  mp["request"]={
    "url"=>:url,
    "range"=>Optional.new(:range)
  }

  mp["provide"]={
    "url"=>:url,
    "range"=>Optional.new(:range)
  }

  mp["unrequest"]={
    "url"=>:url,
    "range"=>Optional.new(:range)
  }

  mp["unprovide"]={
    "url"=>:url,
    "range"=>Optional.new(:range)
  }

  #the taker sends this message when a transfer finishes
  #if there is an error in the transfer, dont set a hash
  #to signify failure
  #when this is received from the taker, the connection is considered done for all parties
  #
  #The giver also sends this message when they are done transferring.
  #this closes the connection on their side, allowing them to start other transfers
  #It leaves the connection open on the taker side to allow them to decide if the transfer was successful
  #the hash parameter is ignored when sent by the giver
  mp["completed"]={
    #"peer"=>:ip, no longer used
    "url"=>:url,
    "range"=>:range,
    "peer_id"=>:string,
    "hash"=>Optional.new(:string)
  }

  mp["hash_verify"]={
    "url"=>:url,
    "range"=>:range,
    "hash_ok"=>:bool
  }

  mp["transfer"]={
    "host"=>:string,
    "port"=>:int,
    "method"=>:string,
    "url"=>:url,
    "range"=>:range,
    "peer_id"=>:string
  }  

  mp["protocol_error"]={
    "message"=>Optional.new(:string)
  }

  mp["protocol_warn"]={
    "message"=>Optional.new(:string)
  }

  mp
end

.obj_matches_type?(obj, type) ⇒ Boolean

returns whether or not a given ruby object matches the specified type available types: :url, :range, :ip, :int, :bool, :string

Returns:

  • (Boolean)


222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/pdtp/common/protocol.rb', line 222

def self.obj_matches_type?(obj,type)
  case type
  when :url then obj.class == String
  when :range then obj.class == Range or obj.class == Hash
  when :int then obj.class == Fixnum
  when :bool then obj == true or obj == false
  when :string then obj.class == String
  when :ip
    ip = IPAddr.new(obj) rescue nil
    !ip.nil?
  else 
    raise "Invalid type specified: #{type}"
  end 
end


173
174
175
# File 'lib/pdtp/common/protocol.rb', line 173

def self.print_info
  puts "num_connections=#{@@num_connections}"
end

.validate_message(message) ⇒ Object

makes sure that the message is valid. if not, throws a ProtocolError

Raises:



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/pdtp/common/protocol.rb', line 189

def self.validate_message(message)
  raise ProtocolError.new("Message is not a JSON array") unless message.is_a? Array
  command, options = message
  
  @@message_params ||= define_message_params

  params = @@message_params[command] rescue nil
  raise ProtocolError.new("Invalid message type: #{command}") if params.nil?

  params.each do |name,type|
    if type.class == Optional
      next if options[name].nil? #dont worry about it if they dont have this param
      type = type.type #grab the real type from within the optional class
    end

    raise ProtocolError.new("required parameter: '#{name}' missing for message: '#{command}'") if options[name].nil?
    unless obj_matches_type?(options[name], type)
      raise ProtocolError.new("parameter: '#{name}' val='#{options[name]}' is not of type: '#{type}' for message: '#{command}' ")
    end
  end    
end

Instance Method Details

#connection_open?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/pdtp/common/protocol.rb', line 50

def connection_open?
  @connection_open
end

#error_close_connection(error) ⇒ Object

close a connection, but first send the specified error message



71
72
73
74
75
76
77
78
# File 'lib/pdtp/common/protocol.rb', line 71

def error_close_connection(error) 
  if PROTOCOL_DEBUG
    send_message :protocol_error, :message => error
    close_connection true # close after writing
  else
    close_connection
  end
end

#get_peer_infoObject

returns the ip address and port in an array [ip, port]



178
179
180
# File 'lib/pdtp/common/protocol.rb', line 178

def get_peer_info
  @cached_peer_info
end

#hash_to_range(command, message) ⇒ Object

converts a PDTP protocol min and max hash to a Ruby Range class



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/pdtp/common/protocol.rb', line 133

def hash_to_range(command, message)
  key="range"
  auto_types=["provide","request"] #these types assume a range if it isnt specified
  auto_types.each do |type|
    if command == type and message[key].nil?
      message[key]={} # assume entire file if not specified
    end
  end

  if message[key]
    raise if message[key].class!=Hash
    min=message[key]["min"] 
    max=message[key]["max"]
    message[key]= (min ? min : 0)..(max ? max : -1)
  end
end

#post_initObject

called by EventMachine after a connection has been established



55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/pdtp/common/protocol.rb', line 55

def post_init
  # a cache of the peer info because eventmachine seems to drop it before we want
  peername = get_peername
  if peername.nil?
    @cached_peer_info = ["<Peername nil!!!>", 91119] if peername.nil?
  else
    port, addr = Socket.unpack_sockaddr_in(peername)
    @cached_peer_info = [addr.to_s, port.to_i]
  end

  @@num_connections += 1
  @connection_open = true
  connection_created if respond_to? :connection_created
end

#range_to_hash(message) ⇒ Object

converts Ruby Range classes in the message to PDTP protocol hashes with min and max 0..-1 => nil (entire file) 10..-1 => “min”=>10 (contents of file >= 10)



118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/pdtp/common/protocol.rb', line 118

def range_to_hash(message)
  message.each do |key,value|
    if value.class==Range
      if value==(0..-1)
        message.delete(key)
      elsif value.last==-1 
        message[key]={"min"=>value.first}
      else
        message[key]={"min"=>value.first,"max"=>value.last}
      end
    end   
  end
end

#receive_packet(packet) ⇒ Object

called for each packet of data received over the wire parses the JSON message and dispatches the message



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/pdtp/common/protocol.rb', line 88

def receive_packet(packet)
  begin
    packet.chomp!
    #@@log.debug "(#{remote_peer_id}) recv: " + packet
    message = JSON.parse(packet) rescue nil
    raise ProtocolError.new("JSON couldn't parse: #{packet}") if message.nil?
    Protocol.validate_message message
    
    command, options = message
    hash_to_range command, options
    receive_message(command, options) if respond_to? :receive_message
  rescue ProtocolError => e
    # FIXME Should likely be raised and handled higher
    STDERR.write "(#{remote_peer_id}) PROTOCOL ERROR: #{e.to_s}\n"
    STDERR.write e.backtrace.join("\n") + "\n"
    error_close_connection e.to_s
  rescue ProtocolWarn => e
    send_message :protocol_warn, :message => e.to_s
  rescue Exception => e
    # FIXME Should likely be raised and handled higher
    STDERR.write "(#{remote_peer_id}) UNKNOWN EXCEPTION #{e.to_s}\n"
    STDERR.write e.backtrace.join("\n") + "\n"
  end
end

#remote_peer_idObject

debug routine: returns id of remote peer on this connection



81
82
83
84
# File 'lib/pdtp/common/protocol.rb', line 81

def remote_peer_id
  ret = client_info.client_id rescue nil
  ret || 'NOID'
end

#send_message(command, opts = {}) ⇒ Object

sends a message, in the internal Hash format, over the wire



151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/pdtp/common/protocol.rb', line 151

def send_message(command, opts = {})
  #message = opts.merge(:type => command.to_s)

  # Stringify all option keys
  opts = opts.map { |k,v| [k.to_s, v] }.inject({}) { |h,(k,v)| h[k] = v; h }
  
  # Convert all Ruby ranges to JSON objects representing them
  range_to_hash opts
  
  # Message format is a JSON array with the command (string) as the first entry
  # Second entry is an options hash/object
  message = [command.to_s, opts]
  send_packet JSON.unparse(message) + "\n"  
end

#to_sObject



182
183
184
185
# File 'lib/pdtp/common/protocol.rb', line 182

def to_s
  addr,port = get_peer_info
  "#{addr}:#{port}"
end

#unbindObject

called by EventMachine when a connection is closed



167
168
169
170
171
# File 'lib/pdtp/common/protocol.rb', line 167

def unbind
  @@num_connections -= 1
  connection_destroyed if respond_to? :connection_destroyed
  @connection_open = false
end