Class: AnyCable::Rack::Hub
- Inherits:
-
Object
- Object
- AnyCable::Rack::Hub
- Defined in:
- lib/anycable/rack/hub.rb
Overview
Constant Summary collapse
- INTERNAL_STREAM =
:__internal__
Instance Attribute Summary collapse
-
#sockets ⇒ Object
readonly
Returns the value of attribute sockets.
-
#streams ⇒ Object
readonly
Returns the value of attribute streams.
Instance Method Summary collapse
- #add_socket(socket, identifier) ⇒ Object
- #add_subscriber(stream, socket, channel) ⇒ Object
- #broadcast(stream, message, coder) ⇒ Object
- #broadcast_all(message) ⇒ Object
- #close_all ⇒ Object
- #disconnect(identifier, reconnect, coder) ⇒ Object
-
#initialize ⇒ Hub
constructor
A new instance of Hub.
- #remove_channel(socket, channel) ⇒ Object
- #remove_socket(socket) ⇒ Object
- #remove_subscriber(stream, socket, channel) ⇒ Object
Constructor Details
#initialize ⇒ Hub
Returns a new instance of Hub.
13 14 15 16 17 18 19 |
# File 'lib/anycable/rack/hub.rb', line 13 def initialize @streams = Hash.new do |streams, stream_id| streams[stream_id] = Hash.new { |channels, channel_id| channels[channel_id] = Set.new } end @sockets = Hash.new { |h, k| h[k] = Set.new } @sync = Mutex.new end |
Instance Attribute Details
#sockets ⇒ Object (readonly)
Returns the value of attribute sockets.
11 12 13 |
# File 'lib/anycable/rack/hub.rb', line 11 def sockets @sockets end |
#streams ⇒ Object (readonly)
Returns the value of attribute streams.
11 12 13 |
# File 'lib/anycable/rack/hub.rb', line 11 def streams @streams end |
Instance Method Details
#add_socket(socket, identifier) ⇒ Object
21 22 23 24 25 |
# File 'lib/anycable/rack/hub.rb', line 21 def add_socket(socket, identifier) @sync.synchronize do @streams[INTERNAL_STREAM][identifier] << socket end end |
#add_subscriber(stream, socket, channel) ⇒ Object
27 28 29 30 31 32 |
# File 'lib/anycable/rack/hub.rb', line 27 def add_subscriber(stream, socket, channel) @sync.synchronize do @streams[stream][channel] << socket @sockets[socket] << [channel, stream] end end |
#broadcast(stream, message, coder) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/anycable/rack/hub.rb', line 66 def broadcast(stream, , coder) list = @sync.synchronize do return unless @streams.key?(stream) @streams[stream].to_a end list.each do |(channel_id, sockets)| decoded = JSON.parse() = (channel_id, decoded, coder) sockets.each { |socket| socket.transmit() } end end |
#broadcast_all(message) ⇒ Object
80 81 82 |
# File 'lib/anycable/rack/hub.rb', line 80 def broadcast_all() sockets.each_key { |socket| socket.transmit() } end |
#close_all ⇒ Object
99 100 101 102 103 104 |
# File 'lib/anycable/rack/hub.rb', line 99 def close_all hub.sockets.dup.each do |socket| hub.remove_socket(socket) socket.close end end |
#disconnect(identifier, reconnect, coder) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/anycable/rack/hub.rb', line 84 def disconnect(identifier, reconnect, coder) sockets = @sync.synchronize do return unless @streams[INTERNAL_STREAM].key?(identifier) @streams[INTERNAL_STREAM][identifier].to_a end msg = ("remote", reconnect, coder) sockets.each do |socket| socket.transmit(msg) socket.close end end |
#remove_channel(socket, channel) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/anycable/rack/hub.rb', line 42 def remove_channel(socket, channel) list = @sync.synchronize do return unless @sockets.key?(socket) @sockets[socket].dup end list.each do |(channel_id, stream)| remove_subscriber(stream, socket, channel) if channel == channel_id end end |
#remove_socket(socket) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/anycable/rack/hub.rb', line 54 def remove_socket(socket) list = @sync.synchronize do return unless @sockets.key?(socket) @sockets[socket].dup end list.each do |(channel_id, stream)| remove_subscriber(stream, socket, channel_id) end end |
#remove_subscriber(stream, socket, channel) ⇒ Object
34 35 36 37 38 39 40 |
# File 'lib/anycable/rack/hub.rb', line 34 def remove_subscriber(stream, socket, channel) @sync.synchronize do @streams[stream][channel].delete(socket) @sockets[socket].delete([channel, stream]) cleanup stream, socket, channel end end |