Class: MessageBus::Backends::Base Abstract
- Inherits:
-
Object
- Object
- MessageBus::Backends::Base
- Defined in:
- lib/message_bus/backends/base.rb
Overview
Backends provide a consistent API over a variety of options for persisting published messages. The API they present is around the publication to and reading of messages from those backlogs in a manner consistent with message_bus’ philosophy.
The heart of the message bus, a backend acts as two things:
-
A channel multiplexer
-
Backlog storage per-multiplexed channel.
Backends manage and expose multiple backlogs:
-
A backlog for each channel, in which messages that were published to that channel are stored.
-
A global backlog, which conceptually stores all published messages, regardless of the channel to which they were published.
Backlog storage mechanisms and schemas are up to each individual backend implementation, and some backends store messages very differently than others. It is not necessary in order to be considered a valid backend, to, for example, store each channel backlog as a separate collection. As long as the API is presented per this documentation, the backend is free to make its own storage and performance optimisations.
The concept of a per-channel backlog permits for lookups of messages in a manner that is optimised for the use case of a subscriber catching up from a message pointer, while a global backlog allows for optimising the case where another system subscribes to the firehose of messages, for example a message_bus server receiving all publications for delivery to subscribed clients.
Backends are fully responsible for maintaining their storage, including any pruning or expiration of that storage that is necessary. message_bus allows for several options for limiting the required storage capacity by either backlog size or the TTL of messages in a backlog. Backends take these settings and effect them either forcibly or by delegating to their storage mechanism.
Message which are published to message_bus have two IDs; one which they are known by in the channel-specific backlog that they are published to, and another (the “global ID”) which is unique across all channels and by which the message can be found in the global backlog. IDs are all sequential integers starting at 0.
Constant Summary collapse
- ConcreteClassMustImplementError =
Raised to indicate that the concrete backend implementation does not implement part of the API
Class.new(StandardError)
- UNSUB_MESSAGE =
Returns a special message published to trigger termination of backend subscriptions.
"$$UNSUBSCRIBE"
Instance Attribute Summary collapse
-
#clear_every ⇒ Integer
Typically, backlogs are trimmed whenever we publish to them.
-
#max_backlog_age ⇒ Integer
The longest amount of time a message may live in a backlog before being removed, in seconds.
-
#max_backlog_size ⇒ Integer
The largest permitted size (number of messages) for per-channel backlogs; beyond this capacity, old messages will be dropped.
-
#max_global_backlog_size ⇒ Integer
The largest permitted size (number of messages) for the global backlog; beyond this capacity, old messages will be dropped.
-
#max_in_memory_publish_backlog ⇒ Integer
The largest permitted size (number of messages) to be held in a memory buffer when publication fails, for later re-publication.
-
#subscribed ⇒ Boolean
readonly
The subscription state of the backend.
Instance Method Summary collapse
-
#after_fork ⇒ Object
Performs routines specific to the backend that are necessary after a process fork, typically triggered by a forking webserver.
-
#backlog(channel, last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from a channel backlog.
-
#destroy ⇒ Object
Closes all open connections to the storage.
-
#expire_all_backlogs! ⇒ Object
abstract
Deletes all backlogs and their data.
-
#get_message(channel, message_id) ⇒ MessageBus::Message?
Get a specific message from a channel.
-
#global_backlog(last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from the global backlog.
-
#global_subscribe(last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on all channels.
-
#global_unsubscribe ⇒ Object
Causes all subscribers to the bus to unsubscribe, and terminates the local connection.
-
#initialize(config = {}, max_backlog_size = 1000) ⇒ Base
constructor
A new instance of Base.
-
#last_id(channel) ⇒ Integer
Get the ID of the last message published on a channel.
-
#last_ids(*channels) ⇒ Array<Integer>
Get the ID of the last message published on multiple channels.
-
#publish(channel, data, opts = nil) ⇒ Integer
Publishes a message to a channel.
-
#reset! ⇒ Object
Deletes all message_bus data from the backend.
-
#subscribe(channel, last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on a particular channel.
Constructor Details
#initialize(config = {}, max_backlog_size = 1000) ⇒ Base
Returns a new instance of Base.
75 |
# File 'lib/message_bus/backends/base.rb', line 75 def initialize(config = {}, max_backlog_size = 1000); end |
Instance Attribute Details
#clear_every ⇒ Integer
Typically, backlogs are trimmed whenever we publish to them. This setting allows some tolerance in order to improve performance.
69 70 71 |
# File 'lib/message_bus/backends/base.rb', line 69 def clear_every @clear_every end |
#max_backlog_age ⇒ Integer
Returns the longest amount of time a message may live in a backlog before being removed, in seconds.
66 67 68 |
# File 'lib/message_bus/backends/base.rb', line 66 def max_backlog_age @max_backlog_age end |
#max_backlog_size ⇒ Integer
Returns the largest permitted size (number of messages) for per-channel backlogs; beyond this capacity, old messages will be dropped.
62 63 64 |
# File 'lib/message_bus/backends/base.rb', line 62 def max_backlog_size @max_backlog_size end |
#max_global_backlog_size ⇒ Integer
Returns the largest permitted size (number of messages) for the global backlog; beyond this capacity, old messages will be dropped.
64 65 66 |
# File 'lib/message_bus/backends/base.rb', line 64 def max_global_backlog_size @max_global_backlog_size end |
#max_in_memory_publish_backlog ⇒ Integer
Returns the largest permitted size (number of messages) to be held in a memory buffer when publication fails, for later re-publication.
71 72 73 |
# File 'lib/message_bus/backends/base.rb', line 71 def max_in_memory_publish_backlog @max_in_memory_publish_backlog end |
#subscribed ⇒ Boolean (readonly)
Returns The subscription state of the backend.
60 61 62 |
# File 'lib/message_bus/backends/base.rb', line 60 def subscribed @subscribed end |
Instance Method Details
#after_fork ⇒ Object
Performs routines specific to the backend that are necessary after a process fork, typically triggered by a forking webserver. Typically this re-opens sockets to the backend.
78 79 80 |
# File 'lib/message_bus/backends/base.rb', line 78 def after_fork raise ConcreteClassMustImplementError end |
#backlog(channel, last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from a channel backlog
136 137 138 |
# File 'lib/message_bus/backends/base.rb', line 136 def backlog(channel, last_id = 0) raise ConcreteClassMustImplementError end |
#destroy ⇒ Object
Closes all open connections to the storage.
88 89 90 |
# File 'lib/message_bus/backends/base.rb', line 88 def destroy raise ConcreteClassMustImplementError end |
#expire_all_backlogs! ⇒ Object
Deletes all backlogs and their data. Does not delete non-backlog data that message_bus may persist, depending on the concrete backend implementation. Use with extreme caution.
94 95 96 |
# File 'lib/message_bus/backends/base.rb', line 94 def expire_all_backlogs! raise ConcreteClassMustImplementError end |
#get_message(channel, message_id) ⇒ MessageBus::Message?
Get a specific message from a channel
155 156 157 |
# File 'lib/message_bus/backends/base.rb', line 155 def (channel, ) raise ConcreteClassMustImplementError end |
#global_backlog(last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from the global backlog
145 146 147 |
# File 'lib/message_bus/backends/base.rb', line 145 def global_backlog(last_id = 0) raise ConcreteClassMustImplementError end |
#global_subscribe(last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on all channels. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.
189 190 191 |
# File 'lib/message_bus/backends/base.rb', line 189 def global_subscribe(last_id = nil) raise ConcreteClassMustImplementError end |
#global_unsubscribe ⇒ Object
Causes all subscribers to the bus to unsubscribe, and terminates the local connection. Typically used to reset tests.
175 176 177 |
# File 'lib/message_bus/backends/base.rb', line 175 def global_unsubscribe raise ConcreteClassMustImplementError end |
#last_id(channel) ⇒ Integer
Get the ID of the last message published on a channel
117 118 119 |
# File 'lib/message_bus/backends/base.rb', line 117 def last_id(channel) raise ConcreteClassMustImplementError end |
#last_ids(*channels) ⇒ Array<Integer>
Get the ID of the last message published on multiple channels
126 127 128 |
# File 'lib/message_bus/backends/base.rb', line 126 def last_ids(*channels) raise ConcreteClassMustImplementError end |
#publish(channel, data, opts = nil) ⇒ Integer
Publishes a message to a channel
108 109 110 |
# File 'lib/message_bus/backends/base.rb', line 108 def publish(channel, data, opts = nil) raise ConcreteClassMustImplementError end |
#reset! ⇒ Object
Deletes all message_bus data from the backend. Use with extreme caution.
83 84 85 |
# File 'lib/message_bus/backends/base.rb', line 83 def reset! raise ConcreteClassMustImplementError end |
#subscribe(channel, last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on a particular channel. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.
170 171 172 |
# File 'lib/message_bus/backends/base.rb', line 170 def subscribe(channel, last_id = nil) raise ConcreteClassMustImplementError end |