19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
# File 'lib/tinker/reactor.rb', line 19
def run
return if @event_queue
trap_sigint
@event_queue = Queue.new
@message_queue = Queue.new
@event_thread = Thread.new do
Thread.current.abort_on_exception = true
loop do
event = @event_queue.pop
event.env.context.dispatch event
end
end
@message_thread = Thread.new do
Thread.current.abort_on_exception = true
loop do
message = @message_queue.pop
message.socket.send(message.body)
end
end
EventMachine.run do
Backro::Application.instance.send(:initialize)
puts "Starting EventMachine on port #{self.config.port}"
@websocket_server = EventMachine::start_server(self.config.host, self.config.port, EventMachine::WebSocket::Connection, {}) do |ws|
websocket = Tinker::WebSocket.new(ws)
client = Tinker::Client.new(websocket)
clients[ws] = client
port, ip = Socket.unpack_sockaddr_in(ws.get_peername)
ws.onopen do
puts "WebSocket Connection open (#{ip}:#{port})"
env = Tinker::Event::Environment.new(client, Tinker.application)
@event_queue.push(Tinker::Event.new("meta.client.join", env))
end
ws.onmessage do |message|
begin
json = JSON(message)
env = Tinker::Event::Environment.new(client, (Tinker::Context.contexts[json['context']] || Tinker.application))
event = Tinker::Event.new("client.message.#{json['action']}", env, json['params'])
@event_queue.push(event)
rescue JSON::ParserError
ws.send({:type => :error, :action => :message, :errors => ["Invalid message format"]}.to_json)
end
end
ws.onclose do
puts "WebSocket Connection closed (#{ip}:#{port})"
env = Tinker::Event::Environment.new(client, Tinker.application)
@event_queue.push(Tinker::Event.new("meta.client.leave", env))
clients.delete ws
end
end
end
@event_thread.terminate
@message_thread.terminate
end
|