Class: Legs

Inherits:
Object
  • Object
show all
Defined in:
lib/legs.rb

Defined Under Namespace

Classes: RemoteError, RequestError, Result, StartBlockError

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host = 'localhost', port = 30274) ⇒ Legs

Legs.new for a client, subclass to make a server, .new then makes server and client!



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/legs.rb', line 11

def initialize(host = 'localhost', port = 30274)
  self.class.start(port) if self.class != Legs && !self.class.started?
  ObjectSpace.define_finalizer(self) { self.close! }
  @parent = false; @responses = Hash.new; @meta = {}; @disconnected = false
  @responses_mutex = Mutex.new; @socket_mutex = Mutex.new
  
  if host.instance_of?(TCPSocket)
    @socket = host
    @parent = port unless port.instance_of?(Numeric)
  elsif host.instance_of?(String)
    @socket = TCPSocket.new(host, port)
    self.class.outgoing_mutex.synchronize { self.class.outgoing.push self }
  else
    raise "First argument needs to be a hostname, ip, or socket"
  end
  
  
  @handle_data = Proc.new do |data|
    data = json_restore(JSON.parse(data))
    
    if data['method']
      (@parent || self.class).__data!(data, self)
    elsif data['error'] and data['id'].nil?
      raise data['error']
    else
      @responses_mutex.synchronize { @responses[data['id']] = data }
    end
  end
  
  @thread = Thread.new do
    until @socket.closed?
      begin
        close! if @socket.eof?
        data = nil
        @socket_mutex.synchronize { data = @socket.gets(self.class.terminator) rescue nil }
        if data.nil?
          close!
        else
          @handle_data[data]
        end
      rescue JSON::ParserError => e
        send_data!({"error" => "JSON provided is invalid. See http://json.org/ to see how to format correctly."})
      rescue IOError => e
        close!
      end
    end
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missingObject

sends a normal RPC request that has a response catch all the rogue calls and make them work niftily



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/legs.rb', line 109

def send!(method, *args, &blk)
  puts "Call #{self.inspect}: #{method}(#{args.map(&:inspect).join(', ')})" if self.class.log?
  id = get_unique_number
  send_data! 'method' => method.to_s, 'params' => args, 'id' => id
  
  worker = Proc.new do
    sleep 0.1 until @responses_mutex.synchronize { @responses.keys.include?(id) }
    
    result = Legs::Result.new(@responses_mutex.synchronize { @responses.delete(id) })
    puts ">> #{method} #=> #{result.data['result'].inspect}" if self.class.log?
    result
  end
  
  if blk.respond_to?(:call); Thread.new { blk[worker.call] }
  else; worker.call.value; end
end

Class Attribute Details

.incomingObject (readonly) Also known as: users

Returns the value of attribute incoming.



205
206
207
# File 'lib/legs.rb', line 205

def incoming
  @incoming
end

.incoming_mutexObject (readonly)

Returns the value of attribute incoming_mutex.



205
206
207
# File 'lib/legs.rb', line 205

def incoming_mutex
  @incoming_mutex
end

.logObject Also known as: log?

Returns the value of attribute log.



204
205
206
# File 'lib/legs.rb', line 204

def log
  @log
end

.messages_mutexObject (readonly)

Returns the value of attribute messages_mutex.



205
206
207
# File 'lib/legs.rb', line 205

def messages_mutex
  @messages_mutex
end

.outgoingObject (readonly)

Returns the value of attribute outgoing.



205
206
207
# File 'lib/legs.rb', line 205

def outgoing
  @outgoing
end

.outgoing_mutexObject (readonly)

Returns the value of attribute outgoing_mutex.



205
206
207
# File 'lib/legs.rb', line 205

def outgoing_mutex
  @outgoing_mutex
end

.server_objectObject (readonly)

Returns the value of attribute server_object.



205
206
207
# File 'lib/legs.rb', line 205

def server_object
  @server_object
end

.terminatorObject

Returns the value of attribute terminator.



204
205
206
# File 'lib/legs.rb', line 204

def terminator
  @terminator
end

Instance Attribute Details

#metaObject (readonly)

general getters



7
8
9
# File 'lib/legs.rb', line 7

def meta
  @meta
end

#parentObject (readonly)

general getters



7
8
9
# File 'lib/legs.rb', line 7

def parent
  @parent
end

#socketObject (readonly)

general getters



7
8
9
# File 'lib/legs.rb', line 7

def socket
  @socket
end

Class Method Details

.__data!(data, from) ⇒ Object

gets called to handle all incoming messages (RPC requests)



345
346
347
# File 'lib/legs.rb', line 345

def __data!(data, from)
  @messages.enq [data, from]
end

.__make_symbol(name) ⇒ Object

lets the marshaler transport symbols



370
# File 'lib/legs.rb', line 370

def __make_symbol(name); name.to_sym; end

.add_block(name, &block) ⇒ Object

add’s a block to the ‘server’ class in a way that retains it’s old bindings. the block will be passed the caller object, followed by the args.



361
362
363
364
365
366
367
# File 'lib/legs.rb', line 361

def add_block(name, &block)
  @server_class.class_eval do
    define_method(name) do |*args|
      block.call caller, *args
    end
  end
end

.connectionsObject

returns an array of all connections



334
335
336
# File 'lib/legs.rb', line 334

def connections
  @incoming + @outgoing
end

.define_method(name, &blk) ⇒ Object

add’s a method to the ‘server’ class, bound in to that class



357
# File 'lib/legs.rb', line 357

def define_method(name, &blk); @server_class.class_eval { define_method(name, &blk) }; end

.event(name, sender, *extras) ⇒ Object

add an event call to the server’s message queue



339
340
341
342
# File 'lib/legs.rb', line 339

def event(name, sender, *extras)
  return unless @server_object.respond_to?("on_#{name}")
  __data!({'method' => "on_#{name}", 'params' => extras.to_a, 'id' => nil}, sender)
end

.initializerObject



210
211
212
213
214
# File 'lib/legs.rb', line 210

def initializer
  ObjectSpace.define_finalizer(self) { self.stop! }
  @incoming = []; @outgoing = []; @messages = Queue.new; @terminator = "\n"; @log = false
  @incoming_mutex = Mutex.new; @outgoing_mutex = Mutex.new; @started = false
end

.open(*args) {|client| ... } ⇒ Object

People say this syntax is too funny not to have… whatever. Works like IO and File and what have you

Yields:

  • (client)


350
351
352
353
354
# File 'lib/legs.rb', line 350

def open(*args)
  client = Legs.new(*args)
  yield(client)
  client.close!
end

.start(port = 30274, &blk) ⇒ Object

starts the server, pass nil for port to make a ‘server’ that doesn’t actually accept connections This is useful for adding methods to Legs so that systems you connect to can call methods back on you



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/legs.rb', line 219

def start(port=30274, &blk)
  return @server_class.module_eval(&blk) if started? and blk.respond_to? :call
  @started = true
  
  # makes a nice clean class to hold all the server methods.
  if @server_class.nil?
    @server_class = Class.new
    @server_class.module_eval do
      private
      attr_reader :server, :caller
      
      # sends a notification message to all connected clients
      def broadcast(*args)
        if args.first.is_a?(Array)
          list = args.shift
          method = args.shift
        elsif args.first.is_a?(String) or args.first.is_a?(Symbol)
          list = server.incoming
          method = args.shift
        else
          raise "You need to specify a 'method' to broadcast out to"
        end
        
        list.each { |user| user.notify!(method, *args) }
      end
      
      # Finds a user by the value of a certain property... like find_user_by :object_id, 12345
      def find_user_by_object_id value
        server.incoming.find { |user| user.object_id == value }
      end
      
      # finds user's with the specified meta keys matching the specified values, can use regexps and stuff, like a case block
      def find_users_by_meta hash = nil
        raise "You need to give find_users_by_meta a hash to check the user's meta hash against" if hash.nil?
        server.incoming.select do |user|
          hash.all? { |key, value| value === user.meta[key] }
        end
      end
      
      public # makes it public again for the user code
    end
  end
  
  @server_class.module_eval(&blk) if blk.respond_to?(:call)
  
  if @server_object.nil?
    @server_object = @server_class.allocate
    @server_object.instance_variable_set(:@server, self)
    @server_object.instance_eval { initialize }
  end

  @message_processor = Thread.new do
    while started?
      sleep 0.01 while @messages.empty?
      data, from = @messages.deq
      method = data['method']; params = data['params']
      methods = @server_object.public_methods(false).map { |i| i.to_s }
      
      # close dead connections
      if data['method'] == '**remote__disconnecting**'
        from.close!
        next
      else
        begin
          raise "Supplied method is not a String" unless method.is_a?(String)
          raise "Supplied params object is not an Array" unless params.is_a?(Array)
          raise "Cannot run '#{method}' because it is not defined in this server" unless methods.include?(method.to_s) or methods.include? :method_missing
          
          puts "Call #{method}(#{params.map(&:inspect).join(', ')})" if log?
          
          @server_object.instance_variable_set(:@caller, from)
          
          result = nil
          
          @incoming_mutex.synchronize do
            if methods.include?(method.to_s)
              result = @server_object.__send__(method.to_s, *params)
            else
              result = @server_object.method_missing(method.to_s, *params)
            end
          end
          
          puts ">> #{method} #=> #{result.inspect}" if log?
          
          from.send_data!({'id' => data['id'], 'result' => result}) unless data['id'].nil?
          
        rescue Exception => e
          from.send_data!({'error' => e, 'id' => data['id']}) unless data['id'].nil?
          puts "Error: #{e}\nBacktrace: " + e.backtrace.join("\n   ") if log?
        end
      end
    end
  end unless @message_processor and @message_processor.alive?
  
  if ( port.nil? or port == false ) == false and @listener.nil?
    @listener = TCPServer.new(port)
    
    @acceptor_thread = Thread.new do
      while started?
        user = Legs.new(@listener.accept, self)
        @incoming_mutex.synchronize { @incoming.push user }
        puts "User #{user.object_id} connected, number of users: #{@incoming.length}" if log?
        self.event :connect, user
      end
    end
  end
end

.started?Boolean

Returns:

  • (Boolean)


208
# File 'lib/legs.rb', line 208

def started?; @started; end

.stopObject

stops the server, disconnects the clients



328
329
330
331
# File 'lib/legs.rb', line 328

def stop
  @started = false
  @incoming.each { |user| user.close! }
end

Instance Method Details

#close!Object

closes the connection and the threads and stuff for this user



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/legs.rb', line 64

def close!
  return if @disconnected == true
  
  @disconnected = true
  puts "User #{inspect} disconnecting" if self.class.log?
  
  # notify the remote side
  notify!('**remote__disconnecting**') rescue nil
  
  if @parent
    @parent.event(:disconnect, self)
    @parent.incoming_mutex.synchronize { @parent.incoming.delete(self) }
  else
    self.class.outgoing_mutex.synchronize { self.class.outgoing.delete(self) }
  end
  
  #Thread.new { sleep(1); @socket.close rescue nil }
  @socket.close
end

#connected?Boolean

I think you can guess this one

Returns:

  • (Boolean)


61
# File 'lib/legs.rb', line 61

def connected?; self.class.connections.include?(self); end

#inspectObject



8
# File 'lib/legs.rb', line 8

def inspect; "<Legs:#{object_id} Meta: #{@meta.inspect}>"; end

#notify!(method, *args, &blk) ⇒ Object

send a notification to this user



85
86
87
88
# File 'lib/legs.rb', line 85

def notify!(method, *args, &blk)
  puts "Notify #{inspect}: #{method}(#{args.map(&:inspect).join(', ')})" if self.class.log?
  send_data!({'method' => method.to_s, 'params' => args, 'id' => nil})
end

#send!(method, *args, &blk) ⇒ Object Also known as: method_missing

sends a normal RPC request that has a response



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/legs.rb', line 91

def send!(method, *args, &blk)
  puts "Call #{self.inspect}: #{method}(#{args.map(&:inspect).join(', ')})" if self.class.log?
  id = get_unique_number
  send_data! 'method' => method.to_s, 'params' => args, 'id' => id
  
  worker = Proc.new do
    sleep 0.1 until @responses_mutex.synchronize { @responses.keys.include?(id) }
    
    result = Legs::Result.new(@responses_mutex.synchronize { @responses.delete(id) })
    puts ">> #{method} #=> #{result.data['result'].inspect}" if self.class.log?
    result
  end
  
  if blk.respond_to?(:call); Thread.new { blk[worker.call] }
  else; worker.call.value; end
end

#send_data!(data) ⇒ Object

sends raw object over the socket



112
113
114
115
116
# File 'lib/legs.rb', line 112

def send_data!(data)
  raise "Lost remote connection" unless connected?
  raw = JSON.generate(json_marshal(data)) + self.class.terminator
  @socket_mutex.synchronize { @socket.write(raw) }
end