Class: Ki::Middleware::Realtime
- Inherits:
-
Object
- Object
- Ki::Middleware::Realtime
- Includes:
- BaseMiddleware
- Defined in:
- lib/ki/middleware/realtime.rb
Instance Method Summary collapse
- #call(env) ⇒ Object
- #channel_manager_action(json, ws, action) ⇒ Object
- #handle_websocket(env) ⇒ Object
- #on_message(ws, socket, data) ⇒ Object
- #show_stats ⇒ Object
- #ws_send(ws, hash) ⇒ Object
Methods included from BaseMiddleware
Instance Method Details
#call(env) ⇒ Object
6 7 8 9 10 11 12 13 14 15 |
# File 'lib/ki/middleware/realtime.rb', line 6 def call(env) req = BaseRequest.new env if req.path.to_s == '/realtime/info' show_stats elsif req.path.to_s == '/realtime' && Faye::WebSocket.websocket?(env) handle_websocket env else @app.call env end end |
#channel_manager_action(json, ws, action) ⇒ Object
71 72 73 74 75 76 77 78 |
# File 'lib/ki/middleware/realtime.rb', line 71 def channel_manager_action(json, ws, action) if json['channel_name'] output = ::Ki::ChannelManager.send(action, json) ws_send(ws, output) else ws_send(ws, { message: 'Please specify a channel_name' }) end end |
#handle_websocket(env) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/ki/middleware/realtime.rb', line 30 def handle_websocket(env) ws = Faye::WebSocket.new(env) socket = ::Ki::ChannelManager.connect ws_send(ws, socket) ws.on :message do |event| (ws, socket, event.data) end timer = EventMachine::PeriodicTimer.new(1) do msgs = ::Ki::ChannelManager.tick(socket_id: socket['id']) ws_send(ws, { messages: msgs }) if msgs.count > 0 end ws.on :close do # |event| timer.cancel ::Ki::ChannelManager.disconnect socket ws = nil end ws.rack_response end |
#on_message(ws, socket, data) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/ki/middleware/realtime.rb', line 54 def (ws, socket, data) json = JSON.parse(data) json['socket_id'] = socket['id'] if json['type'] == 'subscribe' channel_manager_action(json, ws, 'subscribe') elsif json['type'] == 'unsubscribe' output = ::Ki::ChannelManager.unsubscribe json ws_send(ws, output) elsif json['type'] == 'publish' channel_manager_action(json, ws, 'publish') else ws_send(ws, { message: 'Please specify a valid type' }) end rescue JSON::ParserError ws_send(ws, { message: 'Please send a valid json string' }) end |
#show_stats ⇒ Object
21 22 23 24 25 26 27 28 |
# File 'lib/ki/middleware/realtime.rb', line 21 def show_stats hash = { sockets: ::Ki::ChannelManager.sockets } resp = Rack::Response.new(hash.to_json, 200) resp['Content-Type'] = 'application/json' resp.finish end |
#ws_send(ws, hash) ⇒ Object
17 18 19 |
# File 'lib/ki/middleware/realtime.rb', line 17 def ws_send(ws, hash) ws.send(hash.to_json.to_s) end |