Class: MessageBus::ConnectionManager
- Inherits:
-
Object
- Object
- MessageBus::ConnectionManager
- Includes:
- MonitorMixin
- Defined in:
- lib/message_bus/connection_manager.rb
Overview
Manages a set of subscribers with active connections to the server, such that messages which are published during the connection may be dispatched.
Instance Method Summary collapse
-
#add_client(client) ⇒ void
Keeps track of a client with an active connection.
-
#client_count ⇒ Integer
The number of tracked clients.
-
#initialize(bus = nil) ⇒ ConnectionManager
constructor
A new instance of ConnectionManager.
-
#lookup_client(client_id) ⇒ MessageBus::Client
Finds a client by ID.
-
#notify_clients(msg) ⇒ void
Dispatches a message to any connected clients which are permitted to receive it.
-
#remove_client(c) ⇒ void
Removes a client.
Constructor Details
#initialize(bus = nil) ⇒ ConnectionManager
Returns a new instance of ConnectionManager.
12 13 14 15 16 17 |
# File 'lib/message_bus/connection_manager.rb', line 12 def initialize(bus = nil) @clients = {} @subscriptions = {} @bus = bus || MessageBus mon_initialize end |
Instance Method Details
#add_client(client) ⇒ void
This method returns an undefined value.
Keeps track of a client with an active connection
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/message_bus/connection_manager.rb', line 53 def add_client(client) synchronize do existing = @clients[client.client_id] if existing && existing.seq > client.seq client.close else if existing remove_client(existing) existing.close end @clients[client.client_id] = client @subscriptions[client.site_id] ||= {} client.subscriptions.each do |k, _v| subscribe_client(client, k) end end end end |
#client_count ⇒ Integer
Returns the number of tracked clients.
99 100 101 102 103 |
# File 'lib/message_bus/connection_manager.rb', line 99 def client_count synchronize do @clients.length end end |
#lookup_client(client_id) ⇒ MessageBus::Client
Finds a client by ID
92 93 94 95 96 |
# File 'lib/message_bus/connection_manager.rb', line 92 def lookup_client(client_id) synchronize do @clients[client_id] end end |
#notify_clients(msg) ⇒ void
This method returns an undefined value.
Dispatches a message to any connected clients which are permitted to receive it
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/message_bus/connection_manager.rb', line 22 def notify_clients(msg) synchronize do begin site_subs = @subscriptions[msg.site_id] subscription = site_subs[msg.channel] if site_subs return unless subscription subscription.each do |client_id| client = @clients[client_id] if client && client.allowed?(msg) begin client.synchronize do client << msg end rescue # pipe may be broken, move on end # turns out you can delete from a set while iterating remove_client(client) if client.closed? end end rescue => e @bus.logger.error "notify clients crash #{e} : #{e.backtrace}" end end end |
#remove_client(c) ⇒ void
This method returns an undefined value.
Removes a client
76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/message_bus/connection_manager.rb', line 76 def remove_client(c) synchronize do @clients.delete c.client_id @subscriptions[c.site_id].each do |_k, set| set.delete c.client_id end if c.cleanup_timer # concurrency may cause this to fail c.cleanup_timer.cancel rescue nil end end end |