Class: Marvin::Distributed::Protocol
- Inherits:
-
EventMachine::Protocols::LineAndTextProtocol
- Object
- EventMachine::Protocols::LineAndTextProtocol
- Marvin::Distributed::Protocol
show all
- Defined in:
- lib/marvin/distributed/protocol.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
Instance Attribute Details
#callbacks ⇒ Object
Returns the value of attribute callbacks.
9
10
11
|
# File 'lib/marvin/distributed/protocol.rb', line 9
def callbacks
@callbacks
end
|
Instance Method Details
#handle_response(response) ⇒ Object
35
36
37
38
39
40
|
# File 'lib/marvin/distributed/protocol.rb', line 35
def handle_response(response)
logger.debug "Handling response in distributed protocol (response => #{response.inspect})"
return unless response.is_a?(Hash) && response.has_key?("message")
options = response["options"] || {}
process_response_message(response["message"], options)
end
|
#host_with_port ⇒ Object
42
43
44
45
46
47
|
# File 'lib/marvin/distributed/protocol.rb', line 42
def host_with_port
@host_with_port ||= begin
port, ip = Socket.unpack_sockaddr_in(get_peername)
"#{ip}:#{port}"
end
end
|
#receive_line(line) ⇒ Object
11
12
13
14
15
16
17
18
19
20
|
# File 'lib/marvin/distributed/protocol.rb', line 11
def receive_line(line)
line.strip!
logger.debug "<< #{line}"
response = JSON.parse(line)
handle_response(response)
rescue JSON::ParserError
logger.debug "JSON parsing error for #{line.inspect}"
rescue Exception => e
Marvin::ExceptionTracker.log(e)
end
|
#send_message(name, arguments = {}, &callback) ⇒ Object
22
23
24
25
26
27
28
29
30
31
32
33
|
# File 'lib/marvin/distributed/protocol.rb', line 22
def send_message(name, arguments = {}, &callback)
logger.debug "Sending #{name.inspect} to #{self.host_with_port}"
payload = {
"message" => name.to_s,
"options" => arguments,
"sent-at" => Time.now
}
payload.merge!(options_for_callback(callback))
payload = JSON.dump(payload)
logger.debug ">> #{payload}"
send_data "#{payload}\n"
end
|