Class: MessageBus::Backends::Memory
- Defined in:
- lib/message_bus/backends/memory.rb
Overview
This backend diverges from the standard in Base in the following ways:
-
Does not support forking
-
Does not support in-memory buffering of messages on publication (redundant)
The memory backend stores published messages in a simple array per channel, and does not store a separate global backlog.
Defined Under Namespace
Classes: Client
Constant Summary
Constants inherited from Base
Base::ConcreteClassMustImplementError, Base::UNSUB_MESSAGE
Instance Attribute Summary
Attributes inherited from Base
#clear_every, #max_backlog_age, #max_backlog_size, #max_global_backlog_size, #max_in_memory_publish_backlog, #subscribed
Instance Method Summary collapse
-
#after_fork ⇒ Object
No-op; this backend doesn’t support forking.
-
#backlog(channel, last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from a channel backlog.
-
#destroy ⇒ Object
No-op; this backend doesn’t maintain any storage connections.
-
#expire_all_backlogs! ⇒ Object
abstract
Deletes all backlogs and their data.
-
#get_message(channel, message_id) ⇒ MessageBus::Message?
Get a specific message from a channel.
-
#global_backlog(last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from the global backlog.
-
#global_subscribe(last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on all channels.
-
#global_unsubscribe ⇒ Object
Causes all subscribers to the bus to unsubscribe, and terminates the local connection.
-
#initialize(config = {}, max_backlog_size = 1000) ⇒ Memory
constructor
A new instance of Memory.
-
#last_id(channel) ⇒ Integer
Get the ID of the last message published on a channel.
-
#last_ids(*channels) ⇒ Array<Integer>
Get the ID of the last message published on multiple channels.
- #max_backlog_age=(value) ⇒ Object
-
#publish(channel, data, opts = nil) ⇒ Integer
Publishes a message to a channel.
-
#reset! ⇒ Object
Deletes all message_bus data from the backend.
-
#subscribe(channel, last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on a particular channel.
Constructor Details
#initialize(config = {}, max_backlog_size = 1000) ⇒ Memory
Returns a new instance of Memory.
191 192 193 194 195 196 197 198 |
# File 'lib/message_bus/backends/memory.rb', line 191 def initialize(config = {}, max_backlog_size = 1000) @config = config @max_backlog_size = max_backlog_size @max_global_backlog_size = 2000 # after 7 days inactive backlogs will be removed self.max_backlog_age = 604800 @clear_every = config[:clear_every] || 1 end |
Instance Method Details
#after_fork ⇒ Object
No-op; this backend doesn’t support forking.
206 207 208 |
# File 'lib/message_bus/backends/memory.rb', line 206 def after_fork nil end |
#backlog(channel, last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from a channel backlog
255 256 257 258 259 260 261 |
# File 'lib/message_bus/backends/memory.rb', line 255 def backlog(channel, last_id = 0) items = client.backlog channel, last_id.to_i items.map! do |id, data| MessageBus::Message.new id, id, channel, data end end |
#destroy ⇒ Object
No-op; this backend doesn’t maintain any storage connections. (see Base#destroy)
217 218 219 |
# File 'lib/message_bus/backends/memory.rb', line 217 def destroy nil end |
#expire_all_backlogs! ⇒ Object
Deletes all backlogs and their data. Does not delete non-backlog data that message_bus may persist, depending on the concrete backend implementation. Use with extreme caution.
222 223 224 |
# File 'lib/message_bus/backends/memory.rb', line 222 def expire_all_backlogs! client.expire_all_backlogs! end |
#get_message(channel, message_id) ⇒ MessageBus::Message?
Get a specific message from a channel
273 274 275 276 277 278 279 |
# File 'lib/message_bus/backends/memory.rb', line 273 def (channel, ) if data = client.get_value(channel, ) MessageBus::Message.new , , channel, data else nil end end |
#global_backlog(last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from the global backlog
264 265 266 267 268 269 270 |
# File 'lib/message_bus/backends/memory.rb', line 264 def global_backlog(last_id = 0) items = client.global_backlog last_id.to_i items.map! do |id, channel, data| MessageBus::Message.new id, id, channel, data end end |
#global_subscribe(last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on all channels. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 |
# File 'lib/message_bus/backends/memory.rb', line 299 def global_subscribe(last_id = nil) raise ArgumentError unless block_given? highest_id = last_id begin client.subscribe do |on| h = {} on.subscribe do if highest_id process_global_backlog(highest_id) do |m| h[m.global_id] = true yield m end end @subscribed = true end on.unsubscribe do @subscribed = false end on. do |_c, m| m = MessageBus::Message.decode m # we have 3 options # # 1. message came in the correct order GREAT, just deal with it # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog # 3. message came in the incorrect order and is lowest than current highest id, reset if h # If already yielded during the clear backlog when subscribing, # don't yield a duplicate copy. unless h.delete(m.global_id) h = nil if h.empty? yield m end else yield m end end end rescue => error @config[:logger].warn "#{error} subscribe failed, reconnecting in 1 second. Call stack\n#{error.backtrace.join("\n")}" sleep 1 retry end end |
#global_unsubscribe ⇒ Object
Causes all subscribers to the bus to unsubscribe, and terminates the local connection. Typically used to reset tests.
293 294 295 296 |
# File 'lib/message_bus/backends/memory.rb', line 293 def global_unsubscribe client.unsubscribe @subscribed = false end |
#last_id(channel) ⇒ Integer
Get the ID of the last message published on a channel
243 244 245 |
# File 'lib/message_bus/backends/memory.rb', line 243 def last_id(channel) client.max_id(channel) end |
#last_ids(*channels) ⇒ Array<Integer>
Get the ID of the last message published on multiple channels
248 249 250 251 252 |
# File 'lib/message_bus/backends/memory.rb', line 248 def last_ids(*channels) channels.map do |c| last_id(c) end end |
#max_backlog_age=(value) ⇒ Object
200 201 202 |
# File 'lib/message_bus/backends/memory.rb', line 200 def max_backlog_age=(value) client.max_backlog_age = value end |
#publish(channel, data, opts = nil) ⇒ Integer
:queue_in_memory NOT SUPPORTED
Publishes a message to a channel
228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/message_bus/backends/memory.rb', line 228 def publish(channel, data, opts = nil) c = client max_backlog_age = opts && opts[:max_backlog_age] backlog_id = c.add(channel, data, max_backlog_age: max_backlog_age) if backlog_id % clear_every == 0 max_backlog_size = (opts && opts[:max_backlog_size]) || self.max_backlog_size c.clear_global_backlog(backlog_id, @max_global_backlog_size) c.clear_channel_backlog(channel, backlog_id, max_backlog_size) end backlog_id end |
#reset! ⇒ Object
Deletes all message_bus data from the backend. Use with extreme caution.
211 212 213 |
# File 'lib/message_bus/backends/memory.rb', line 211 def reset! client.reset! end |
#subscribe(channel, last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on a particular channel. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.
282 283 284 285 286 287 288 289 290 |
# File 'lib/message_bus/backends/memory.rb', line 282 def subscribe(channel, last_id = nil) # trivial implementation for now, # can cut down on connections if we only have one global subscriber raise ArgumentError unless block_given? global_subscribe(last_id) do |m| yield m if m.channel == channel end end |