Class: Marvin::Distributed::Protocol

Inherits:
EventMachine::Protocols::LineAndTextProtocol
  • Object
show all
Defined in:
lib/marvin/distributed/protocol.rb

Direct Known Subclasses

Client::EMConnection, Server

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#callbacksObject

Returns the value of attribute callbacks.



9
10
11
# File 'lib/marvin/distributed/protocol.rb', line 9

def callbacks
  @callbacks
end

Instance Method Details

#handle_response(response) ⇒ Object



35
36
37
38
39
40
# File 'lib/marvin/distributed/protocol.rb', line 35

def handle_response(response)
  logger.debug "Handling response in distributed protocol (response => #{response.inspect})"
  return unless response.is_a?(Hash) && response.has_key?("message")
  options = response["options"] || {}
  process_response_message(response["message"], options)
end

#host_with_portObject



42
43
44
45
46
47
# File 'lib/marvin/distributed/protocol.rb', line 42

def host_with_port
  @host_with_port ||= begin
    port, ip = Socket.unpack_sockaddr_in(get_peername)
    "#{ip}:#{port}"
  end
end

#receive_line(line) ⇒ Object



11
12
13
14
15
16
17
18
19
20
# File 'lib/marvin/distributed/protocol.rb', line 11

def receive_line(line)
  line.strip!
  logger.debug "<< #{line}"
  response = JSON.parse(line)
  handle_response(response)
rescue JSON::ParserError
  logger.debug "JSON parsing error for #{line.inspect}"
rescue Exception => e
  Marvin::ExceptionTracker.log(e)
end

#send_message(name, arguments = {}, &callback) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/marvin/distributed/protocol.rb', line 22

def send_message(name, arguments = {}, &callback)
  logger.debug "Sending #{name.inspect} to #{self.host_with_port}"
  payload = {
    "message" => name.to_s,
    "options" => arguments,
    "sent-at" => Time.now
  }
  payload.merge!(options_for_callback(callback))
  payload = JSON.dump(payload)
  logger.debug ">> #{payload}"
  send_data "#{payload}\n"
end