Class: MessageBus::Backends::Redis
- Defined in:
- lib/message_bus/backends/redis.rb
Overview
This backend diverges from the standard in Base in the following ways:
-
‘max_backlog_age` options in this backend differ from the behaviour of other backends, in that either no messages are removed (when publications happen more regularly than this time-frame) or all messages are removed (when no publication happens during this time-frame).
The Redis backend stores published messages in Redis sorted sets (using ZADD, where the score is the message ID), one for each channel (where the full message is stored), and also in a global backlog as a simple pointer to the respective channel and channel-specific ID. In addition, publication publishes full messages to a Redis PubSub channel; this is used for actively subscribed message_bus servers to consume published messages in real-time while connected and forward them to subscribers, while catch-up is performed from the backlog sorted sets.
Message lookup is performed using the Redis ZRANGEBYSCORE command, and backlog trimming uses ZREMRANGEBYSCORE. The last used channel-specific and global IDs are stored as integers in simple Redis keys and incremented on publication.
Publication is implemented using a Lua script to ensure that it is atomic and messages are not corrupted by parallel publication.
Defined Under Namespace
Classes: BackLogOutOfOrder
Constant Summary collapse
- LUA_PUBLISH =
Note, the script takes care of all expiry of keys, however we do not expire the global backlog key cause we have no simple way to determine what it should be on publish we do not provide a mechanism to set a global max backlog age, only a per-channel which we can override on publish
<<LUA local start_payload = ARGV[1] local max_backlog_age = ARGV[2] local max_backlog_size = tonumber(ARGV[3]) local max_global_backlog_size = tonumber(ARGV[4]) local channel = ARGV[5] local clear_every = ARGV[6] local global_id_key = KEYS[1] local backlog_id_key = KEYS[2] local backlog_key = KEYS[3] local global_backlog_key = KEYS[4] local redis_channel_name = KEYS[5] local global_id = redis.call("INCR", global_id_key) local backlog_id = redis.call("INCR", backlog_id_key) local payload = table.concat({ global_id, backlog_id, start_payload }, "|") local global_backlog_message = table.concat({ backlog_id, channel }, "|") redis.call("ZADD", backlog_key, backlog_id, payload) redis.call("EXPIRE", backlog_key, max_backlog_age) redis.call("ZADD", global_backlog_key, global_id, global_backlog_message) redis.call("EXPIRE", global_backlog_key, max_backlog_age) redis.call("PUBLISH", redis_channel_name, payload) redis.call("EXPIRE", backlog_id_key, max_backlog_age) if backlog_id > max_backlog_size and backlog_id % clear_every == 0 then redis.call("ZREMRANGEBYSCORE", backlog_key, 1, backlog_id - max_backlog_size) end if global_id > max_global_backlog_size and global_id % clear_every == 0 then redis.call("ZREMRANGEBYSCORE", global_backlog_key, 1, global_id - max_global_backlog_size) end return backlog_id LUA
- LUA_PUBLISH_SHA1 =
Digest::SHA1.hexdigest(LUA_PUBLISH)
Constants inherited from Base
Base::ConcreteClassMustImplementError, Base::UNSUB_MESSAGE
Instance Attribute Summary
Attributes inherited from Base
#clear_every, #max_backlog_age, #max_backlog_size, #max_global_backlog_size, #max_in_memory_publish_backlog, #subscribed
Instance Method Summary collapse
-
#after_fork ⇒ Object
Reconnects to Redis; used after a process fork, typically triggered by a forking webserver.
-
#backlog(channel, last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from a channel backlog.
-
#destroy ⇒ Object
Closes all open connections to the storage.
-
#expire_all_backlogs! ⇒ Object
Deletes all backlogs and their data.
-
#get_message(channel, message_id) ⇒ MessageBus::Message?
Get a specific message from a channel.
-
#global_backlog(last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from the global backlog.
-
#global_subscribe(last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on all channels.
-
#global_unsubscribe ⇒ Object
Causes all subscribers to the bus to unsubscribe, and terminates the local connection.
-
#initialize(redis_config = {}, max_backlog_size = 1000) ⇒ Redis
constructor
A new instance of Redis.
-
#last_id(channel) ⇒ Integer
Get the ID of the last message published on a channel.
-
#last_ids(*channels) ⇒ Array<Integer>
Get the ID of the last message published on multiple channels.
-
#publish(channel, data, opts = nil) ⇒ Integer
Publishes a message to a channel.
-
#reset! ⇒ Object
Deletes all message_bus data from the backend.
-
#subscribe(channel, last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on a particular channel.
Constructor Details
#initialize(redis_config = {}, max_backlog_size = 1000) ⇒ Redis
Returns a new instance of Redis.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/message_bus/backends/redis.rb', line 48 def initialize(redis_config = {}, max_backlog_size = 1000) @redis_config = redis_config.dup @clear_every = redis_config.delete(:clear_every) || 1 @logger = @redis_config[:logger] @redis_config[:logger] = nil unless @redis_config[:enable_redis_logger] @max_backlog_size = max_backlog_size @max_global_backlog_size = 2000 @max_in_memory_publish_backlog = 1000 @in_memory_backlog = [] @lock = Mutex.new @flush_backlog_thread = nil @pub_redis = nil @subscribed = false # after 7 days inactive backlogs will be removed @max_backlog_age = 604_800 end |
Instance Method Details
#after_fork ⇒ Object
Reconnects to Redis; used after a process fork, typically triggered by a forking webserver
67 68 69 |
# File 'lib/message_bus/backends/redis.rb', line 67 def after_fork @pub_redis&.disconnect! end |
#backlog(channel, last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from a channel backlog
203 204 205 206 207 208 209 |
# File 'lib/message_bus/backends/redis.rb', line 203 def backlog(channel, last_id = 0) redis = pub_redis backlog_key = backlog_key(channel) items = redis.zrangebyscore backlog_key, last_id.to_i + 1, "+inf" items.map { |i| MessageBus::Message.decode(i) } end |
#destroy ⇒ Object
Closes all open connections to the storage.
77 78 79 |
# File 'lib/message_bus/backends/redis.rb', line 77 def destroy @pub_redis&.disconnect! end |
#expire_all_backlogs! ⇒ Object
Deletes all backlogs and their data. Does not delete ID pointers, so new publications will get IDs that continue from the last publication before the expiry. Use with extreme caution.
83 84 85 |
# File 'lib/message_bus/backends/redis.rb', line 83 def expire_all_backlogs! pub_redis.keys("__mb_*backlog_n").each { |k| pub_redis.del k } end |
#get_message(channel, message_id) ⇒ MessageBus::Message?
Get a specific message from a channel
228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/message_bus/backends/redis.rb', line 228 def (channel, ) redis = pub_redis backlog_key = backlog_key(channel) items = redis.zrangebyscore backlog_key, , if items && items[0] MessageBus::Message.decode(items[0]) else nil end end |
#global_backlog(last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from the global backlog
212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/message_bus/backends/redis.rb', line 212 def global_backlog(last_id = 0) items = pub_redis.zrangebyscore global_backlog_key, last_id.to_i + 1, "+inf" items.map! do |i| pipe = i.index "|" = i[0..pipe].to_i channel = i[pipe + 1..-1] m = (channel, ) m end items.compact! items end |
#global_subscribe(last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on all channels. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/message_bus/backends/redis.rb', line 268 def global_subscribe(last_id = nil, &blk) raise ArgumentError unless block_given? highest_id = last_id clear_backlog = lambda do retries = 4 begin highest_id = process_global_backlog(highest_id, retries > 0, &blk) rescue BackLogOutOfOrder => e highest_id = e.highest_id retries -= 1 sleep(rand(50) / 1000.0) retry end end begin global_redis = new_redis_connection clear_backlog.call(&blk) if highest_id global_redis.subscribe(redis_channel_name) do |on| on.subscribe do clear_backlog.call(&blk) if highest_id @subscribed = true end on.unsubscribe { @subscribed = false } on. do |_c, m| if m == UNSUB_MESSAGE @subscribed = false global_redis.unsubscribe return end m = MessageBus::Message.decode m # we have 3 options # # 1. message came in the correct order GREAT, just deal with it # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog # 3. message came in the incorrect order and is lowest than current highest id, reset if highest_id.nil? || m.global_id == highest_id + 1 highest_id = m.global_id yield m else clear_backlog.call(&blk) end end end rescue => error @logger.warn "#{error} subscribe failed, reconnecting in 1 second. Call stack #{error.backtrace.join("\n")}" sleep 1 global_redis&.disconnect! retry ensure global_redis&.disconnect! end end |
#global_unsubscribe ⇒ Object
Causes all subscribers to the bus to unsubscribe, and terminates the local connection. Typically used to reset tests.
257 258 259 260 261 262 263 264 265 |
# File 'lib/message_bus/backends/redis.rb', line 257 def global_unsubscribe begin new_redis = new_redis_connection new_redis.publish(redis_channel_name, UNSUB_MESSAGE) ensure new_redis&.disconnect! @subscribed = false end end |
#last_id(channel) ⇒ Integer
Get the ID of the last message published on a channel
190 191 192 193 |
# File 'lib/message_bus/backends/redis.rb', line 190 def last_id(channel) backlog_id_key = backlog_id_key(channel) pub_redis.get(backlog_id_key).to_i end |
#last_ids(*channels) ⇒ Array<Integer>
Get the ID of the last message published on multiple channels
196 197 198 199 200 |
# File 'lib/message_bus/backends/redis.rb', line 196 def last_ids(*channels) return [] if channels.size == 0 backlog_id_keys = channels.map { |c| backlog_id_key(c) } pub_redis.mget(*backlog_id_keys).map(&:to_i) end |
#publish(channel, data, opts = nil) ⇒ Integer
Publishes a message to a channel
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/message_bus/backends/redis.rb', line 132 def publish(channel, data, opts = nil) queue_in_memory = (opts && opts[:queue_in_memory]) != false max_backlog_age = (opts && opts[:max_backlog_age]) || self.max_backlog_age max_backlog_size = (opts && opts[:max_backlog_size]) || self.max_backlog_size redis = pub_redis backlog_id_key = backlog_id_key(channel) backlog_key = backlog_key(channel) msg = MessageBus::Message.new nil, nil, channel, data cached_eval( redis, LUA_PUBLISH, LUA_PUBLISH_SHA1, argv: [ msg.encode_without_ids, max_backlog_age, max_backlog_size, max_global_backlog_size, channel, clear_every, ], keys: [ global_id_key, backlog_id_key, backlog_key, global_backlog_key, redis_channel_name, ], ) rescue ::Redis::CommandError => e if queue_in_memory && e. =~ /READONLY/ @lock.synchronize do @in_memory_backlog << [channel, data] if @in_memory_backlog.length > @max_in_memory_publish_backlog @in_memory_backlog.delete_at(0) @logger.warn( "Dropping old message cause max_in_memory_publish_backlog is full: #{e.}\n#{e.backtrace.join('\n')}", ) end end if @flush_backlog_thread == nil @lock.synchronize do if @flush_backlog_thread == nil @flush_backlog_thread = Thread.new { ensure_backlog_flushed } end end end nil else raise end end |
#reset! ⇒ Object
Deletes all message_bus data from the backend. Use with extreme caution.
72 73 74 |
# File 'lib/message_bus/backends/redis.rb', line 72 def reset! pub_redis.keys("__mb_*").each { |k| pub_redis.del k } end |
#subscribe(channel, last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on a particular channel. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.
241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/message_bus/backends/redis.rb', line 241 def subscribe(channel, last_id = nil) # trivial implementation for now, # can cut down on connections if we only have one global subscriber raise ArgumentError unless block_given? if last_id # we need to translate this to a global id, at least give it a shot # we are subscribing on global and global is always going to be bigger than local # so worst case is a replay of a few messages = (channel, last_id) last_id = .global_id if end global_subscribe(last_id) { |m| yield m if m.channel == channel } end |