Class: MessageBus::Client
- Inherits:
-
Object
- Object
- MessageBus::Client
- Defined in:
- lib/message_bus/client.rb
Overview
Represents a connected subscriber and delivers published messages over its connected socket.
Instance Attribute Summary collapse
- #async_response ⇒ Thin::AsyncResponse?
-
#cleanup_timer ⇒ MessageBus::TimerThread::Cancelable
A timer job that is used to auto-disconnect the client at the configured long-polling interval.
-
#client_id ⇒ String
The unique ID provided by the client.
-
#connect_time ⇒ Time
The time at which the client connected.
-
#group_ids ⇒ Array<String,Integer>
The group IDs the authenticated client is a member of.
-
#headers ⇒ Hash<String => String>
Custom headers to include in HTTP responses.
-
#io ⇒ IO
The HTTP socket the client is connected on.
-
#seq ⇒ Integer
The connection sequence number the client provided when connecting.
-
#site_id ⇒ String
The site ID the client was authenticated for; used for hosting multiple.
-
#use_chunked ⇒ Boolean
Whether or not the client should use chunked encoding.
-
#user_id ⇒ String, Integer
The user ID the client was authenticated for.
Instance Method Summary collapse
-
#<<(msg) ⇒ void
Delivers a message to the client, even if it’s empty.
-
#allowed?(msg) ⇒ Boolean
Whether or not the client has permission to receive the passed message.
-
#backlog ⇒ Array<MessageBus::Message>
The set of messages the client is due to receive, based on its subscriptions and permissions.
-
#close ⇒ Object
Closes the client connection.
-
#closed? ⇒ Boolean
Whether the connection is closed or not.
-
#deliver_backlog(backlog) ⇒ void
Delivers a backlog of messages to the client, if there is anything in it.
-
#ensure_first_chunk_sent ⇒ Object
If no data has yet been sent to the client, sends an empty chunk; prevents clients from entering a timeout state if nothing is delivered initially.
-
#initialize(opts) ⇒ Client
constructor
A new instance of Client.
-
#subscribe(channel, last_seen_id) ⇒ void
Subscribes the client to messages on a channel, optionally from a defined starting point.
-
#subscriptions ⇒ Hash<String => Integer>
The active subscriptions, mapping channel names to last seen message IDs.
- #synchronize { ... } ⇒ void
Constructor Details
#initialize(opts) ⇒ Client
Returns a new instance of Client.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/message_bus/client.rb', line 38 def initialize(opts) self.client_id = opts[:client_id] self.user_id = opts[:user_id] self.group_ids = opts[:group_ids] || [] self.site_id = opts[:site_id] self.seq = opts[:seq].to_i self.connect_time = Time.now @lock = Mutex.new @bus = opts[:message_bus] || MessageBus @subscriptions = {} @chunks_sent = 0 @async_response = nil @io = nil @wrote_headers = false end |
Instance Attribute Details
#async_response ⇒ Thin::AsyncResponse?
20 21 22 |
# File 'lib/message_bus/client.rb', line 20 def async_response @async_response end |
#cleanup_timer ⇒ MessageBus::TimerThread::Cancelable
Returns a timer job that is used to auto-disconnect the client at the configured long-polling interval.
18 19 20 |
# File 'lib/message_bus/client.rb', line 18 def cleanup_timer @cleanup_timer end |
#client_id ⇒ String
Returns the unique ID provided by the client.
7 8 9 |
# File 'lib/message_bus/client.rb', line 7 def client_id @client_id end |
#connect_time ⇒ Time
Returns the time at which the client connected.
13 14 15 |
# File 'lib/message_bus/client.rb', line 13 def connect_time @connect_time end |
#group_ids ⇒ Array<String,Integer>
Returns the group IDs the authenticated client is a member of.
11 12 13 |
# File 'lib/message_bus/client.rb', line 11 def group_ids @group_ids end |
#headers ⇒ Hash<String => String>
Returns custom headers to include in HTTP responses.
24 25 26 |
# File 'lib/message_bus/client.rb', line 24 def headers @headers end |
#io ⇒ IO
Returns the HTTP socket the client is connected on.
22 23 24 |
# File 'lib/message_bus/client.rb', line 22 def io @io end |
#seq ⇒ Integer
Returns the connection sequence number the client provided when connecting.
26 27 28 |
# File 'lib/message_bus/client.rb', line 26 def seq @seq end |
#site_id ⇒ String
Returns the site ID the client was authenticated for; used for hosting multiple.
15 16 17 |
# File 'lib/message_bus/client.rb', line 15 def site_id @site_id end |
#use_chunked ⇒ Boolean
Returns whether or not the client should use chunked encoding.
28 29 30 |
# File 'lib/message_bus/client.rb', line 28 def use_chunked @use_chunked end |
#user_id ⇒ String, Integer
Returns the user ID the client was authenticated for.
9 10 11 |
# File 'lib/message_bus/client.rb', line 9 def user_id @user_id end |
Instance Method Details
#<<(msg) ⇒ void
This method returns an undefined value.
Delivers a message to the client, even if it’s empty
121 122 123 124 125 126 127 128 |
# File 'lib/message_bus/client.rb', line 121 def <<(msg) json = ([msg]) if use_chunked write_chunk json else write_and_close json end end |
#allowed?(msg) ⇒ Boolean
Returns whether or not the client has permission to receive the passed message.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/message_bus/client.rb', line 133 def allowed?(msg) client_allowed = !msg.client_ids || msg.client_ids.length == 0 || msg.client_ids.include?(self.client_id) user_allowed = false group_allowed = false has_users = msg.user_ids && msg.user_ids.length > 0 has_groups = msg.group_ids && msg.group_ids.length > 0 if has_users user_allowed = msg.user_ids.include?(self.user_id) end if has_groups group_allowed = ( msg.group_ids - (self.group_ids || []) ).length < msg.group_ids.length end = client_allowed && (user_allowed || group_allowed || (!has_users && !has_groups)) return if ! filters_allowed = true len = @bus..length while len > 0 len -= 1 channel_prefix, blk = @bus.[len] if msg.channel.start_with?(channel_prefix) filters_allowed = blk.call(msg) break if !filters_allowed end end filters_allowed end |
#backlog ⇒ Array<MessageBus::Message>
Returns the set of messages the client is due to receive, based on its subscriptions and permissions. Includes status message if any channels have no messages available and the client requested a message newer than the newest on the channel, or when there are messages available that the client doesn’t have permission for.
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/message_bus/client.rb', line 177 def backlog r = [] = nil last_bus_ids = @bus.last_ids(*@subscriptions.keys, site_id: site_id) @subscriptions.each do |k, v| last_client_id = v.to_i last_bus_id = last_bus_ids[k] if last_client_id < -1 # Client requesting backlog relative to bus position last_client_id = last_bus_id + last_client_id + 1 last_client_id = 0 if last_client_id < 0 elsif last_client_id == -1 # Client not requesting backlog next elsif last_client_id == last_bus_id # Client already up-to-date next elsif last_client_id > last_bus_id # Client ahead of the bus @subscriptions[k] = -1 next end = @bus.backlog(k, last_client_id, site_id) .each do |msg| if allowed?(msg) r << msg else ||= {} [k] = msg. end end end # stats message for all newly subscribed = nil @subscriptions.each do |k, v| if v.to_i == -1 || ( && [k]) ||= {} @subscriptions[k] = [k] = last_bus_ids[k] end end r << MessageBus::Message.new(-1, -1, '/__status', ) if r || [] end |
#close ⇒ Object
Closes the client connection
61 62 63 64 65 66 67 68 |
# File 'lib/message_bus/client.rb', line 61 def close if cleanup_timer # concurrency may nil cleanup timer cleanup_timer.cancel rescue nil self.cleanup_timer = nil end ensure_closed! end |
#closed? ⇒ Boolean
Returns whether the connection is closed or not.
95 96 97 |
# File 'lib/message_bus/client.rb', line 95 def closed? !@async_response && !@io end |
#deliver_backlog(backlog) ⇒ void
This method returns an undefined value.
Delivers a backlog of messages to the client, if there is anything in it. If chunked encoding/streaming is in use, will keep the connection open; if not, will close it.
76 77 78 79 80 81 82 83 84 |
# File 'lib/message_bus/client.rb', line 76 def deliver_backlog(backlog) if backlog.length > 0 if use_chunked write_chunk((backlog)) else write_and_close (backlog) end end end |
#ensure_first_chunk_sent ⇒ Object
If no data has yet been sent to the client, sends an empty chunk; prevents clients from entering a timeout state if nothing is delivered initially.
88 89 90 91 92 |
# File 'lib/message_bus/client.rb', line 88 def ensure_first_chunk_sent if use_chunked && @chunks_sent == 0 write_chunk("[]") end end |
#subscribe(channel, last_seen_id) ⇒ void
This method returns an undefined value.
Subscribes the client to messages on a channel, optionally from a defined starting point.
106 107 108 109 110 |
# File 'lib/message_bus/client.rb', line 106 def subscribe(channel, last_seen_id) last_seen_id = nil if last_seen_id == "" last_seen_id ||= @bus.last_id(channel) @subscriptions[channel] = last_seen_id.to_i end |
#subscriptions ⇒ Hash<String => Integer>
Returns the active subscriptions, mapping channel names to last seen message IDs.
114 115 116 |
# File 'lib/message_bus/client.rb', line 114 def subscriptions @subscriptions end |
#synchronize { ... } ⇒ void
This method returns an undefined value.
56 57 58 |
# File 'lib/message_bus/client.rb', line 56 def synchronize @lock.synchronize { yield } end |