Class: Reacter::SocketAdapter
Defined Under Namespace
Classes: SocketHandler
Instance Attribute Summary
Attributes inherited from Adapter
#config, #enable, #type
Instance Method Summary
collapse
Methods inherited from Adapter
create, #disable, #enabled?, #initialize
Instance Method Details
#connect(args = {}) ⇒ Object
23
24
25
26
27
28
29
30
31
32
|
# File 'lib/reacter/adapters/socket.rb', line 23
def connect(args={})
@addr = @config.get(:address, '0.0.0.0')
@port = @config.get(:port, 54546).to_i
@buffer = []
if args[:mode] == 'write'
@writer = TCPSocket.new(@addr, @port)
end
end
|
#poll(&block) ⇒ Object
56
57
58
59
60
61
|
# File 'lib/reacter/adapters/socket.rb', line 56
def poll(&block)
@server = EM.start_server(@addr, @port, SocketHandler, proc{|data|
messages = Message.parse(data.chomp)
block.call(messages) unless messages.empty?
})
end
|
#send(message, format = nil) ⇒ Object
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/reacter/adapters/socket.rb', line 34
def send(message, format=nil)
if defined?(@writer)
if (sz = @config.get(:buffersize))
if @buffer.length == sz
payload = @buffer.join("\n")
@buffer = []
else
@buffer << Message.dump(message, format)
end
else
payload = Message.dump(message, format)
end
@writer.puts(payload)
@writer.flush()
return true
else
return false
end
end
|