Class: MessageBus::Backends::Postgres
- Defined in:
- lib/message_bus/backends/postgres.rb
Overview
This backend diverges from the standard in Base in the following ways:
-
Does not support in-memory buffering of messages on publication
-
Does not expire backlogs until they are published to
The Postgres backend stores published messages in a single Postgres table with only global IDs, and an index on channel name and ID for fast per-channel lookup. All queries are implemented as prepared statements to reduce the wire-chatter during use. In addition to storage in the table, messages are published using ‘pg_notify`; this is used for actively subscribed message_bus servers to consume published messages in real-time while connected and forward them to subscribers, while catch-up is performed from the backlog table.
Defined Under Namespace
Classes: Client
Constant Summary
Constants inherited from Base
Base::ConcreteClassMustImplementError, Base::UNSUB_MESSAGE
Instance Attribute Summary
Attributes inherited from Base
#clear_every, #max_backlog_age, #max_backlog_size, #max_global_backlog_size, #max_in_memory_publish_backlog, #subscribed
Class Method Summary collapse
Instance Method Summary collapse
-
#after_fork ⇒ Object
Reconnects to Postgres; used after a process fork, typically triggered by a forking webserver.
-
#backlog(channel, last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from a channel backlog.
-
#destroy ⇒ Object
Closes all open connections to the storage.
-
#expire_all_backlogs! ⇒ Object
abstract
Deletes all backlogs and their data.
-
#get_message(channel, message_id) ⇒ MessageBus::Message?
Get a specific message from a channel.
-
#global_backlog(last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from the global backlog.
-
#global_subscribe(last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on all channels.
-
#global_unsubscribe ⇒ Object
Causes all subscribers to the bus to unsubscribe, and terminates the local connection.
-
#initialize(config = {}, max_backlog_size = 1000) ⇒ Postgres
constructor
A new instance of Postgres.
-
#last_id(channel) ⇒ Integer
Get the ID of the last message published on a channel.
-
#last_ids(*channels) ⇒ Array<Integer>
Get the ID of the last message published on multiple channels.
-
#publish(channel, data, opts = nil) ⇒ Integer
Publishes a message to a channel.
-
#reset! ⇒ Object
Deletes all message_bus data from the backend.
-
#subscribe(channel, last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on a particular channel.
Constructor Details
#initialize(config = {}, max_backlog_size = 1000) ⇒ Postgres
Returns a new instance of Postgres.
274 275 276 277 278 279 280 281 282 283 |
# File 'lib/message_bus/backends/postgres.rb', line 274 def initialize(config = {}, max_backlog_size = 1000) @config = config @max_backlog_size = max_backlog_size @max_global_backlog_size = 2000 # after 7 days inactive backlogs will be removed @max_backlog_age = 604800 @clear_every = config[:clear_every] || 1 @mutex = Mutex.new @client = nil end |
Class Method Details
.reset!(config) ⇒ Object
265 266 267 |
# File 'lib/message_bus/backends/postgres.rb', line 265 def self.reset!(config) MessageBus::Postgres::Client.new(config).reset! end |
Instance Method Details
#after_fork ⇒ Object
Reconnects to Postgres; used after a process fork, typically triggered by a forking webserver
287 288 289 |
# File 'lib/message_bus/backends/postgres.rb', line 287 def after_fork client.after_fork end |
#backlog(channel, last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from a channel backlog
338 339 340 341 342 343 344 |
# File 'lib/message_bus/backends/postgres.rb', line 338 def backlog(channel, last_id = 0) items = client.backlog channel, last_id.to_i items.map! do |id, data| MessageBus::Message.new id, id, channel, data end end |
#destroy ⇒ Object
Closes all open connections to the storage.
297 298 299 |
# File 'lib/message_bus/backends/postgres.rb', line 297 def destroy client.destroy end |
#expire_all_backlogs! ⇒ Object
Deletes all backlogs and their data. Does not delete non-backlog data that message_bus may persist, depending on the concrete backend implementation. Use with extreme caution.
302 303 304 |
# File 'lib/message_bus/backends/postgres.rb', line 302 def expire_all_backlogs! client.expire_all_backlogs! end |
#get_message(channel, message_id) ⇒ MessageBus::Message?
Get a specific message from a channel
356 357 358 359 360 361 362 |
# File 'lib/message_bus/backends/postgres.rb', line 356 def (channel, ) if data = client.get_value(channel, ) MessageBus::Message.new , , channel, data else nil end end |
#global_backlog(last_id = 0) ⇒ Array<MessageBus::Message>
Get messages from the global backlog
347 348 349 350 351 352 353 |
# File 'lib/message_bus/backends/postgres.rb', line 347 def global_backlog(last_id = 0) items = client.global_backlog last_id.to_i items.map! do |id, channel, data| MessageBus::Message.new id, id, channel, data end end |
#global_subscribe(last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on all channels. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.
382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 |
# File 'lib/message_bus/backends/postgres.rb', line 382 def global_subscribe(last_id = nil) raise ArgumentError unless block_given? highest_id = last_id begin client.subscribe(postgresql_channel_name) do |on| h = {} on.subscribe do if highest_id process_global_backlog(highest_id) do |m| h[m.global_id] = true yield m end end h = nil if h.empty? @subscribed = true end on.unsubscribe do @subscribed = false end on. do |_c, m| if m == UNSUB_MESSAGE @subscribed = false return end m = MessageBus::Message.decode m # we have 3 options # # 1. message came in the correct order GREAT, just deal with it # 2. message came in the incorrect order COMPLICATED, wait a tiny bit and clear backlog # 3. message came in the incorrect order and is lowest than current highest id, reset if h # If already yielded during the clear backlog when subscribing, # don't yield a duplicate copy. unless h.delete(m.global_id) h = nil if h.empty? yield m end else yield m end end end rescue => error @config[:logger].warn "#{error} subscribe failed, reconnecting in 1 second. Call stack\n#{error.backtrace.join("\n")}" sleep 1 retry end end |
#global_unsubscribe ⇒ Object
Causes all subscribers to the bus to unsubscribe, and terminates the local connection. Typically used to reset tests.
376 377 378 379 |
# File 'lib/message_bus/backends/postgres.rb', line 376 def global_unsubscribe client.publish(postgresql_channel_name, UNSUB_MESSAGE) @subscribed = false end |
#last_id(channel) ⇒ Integer
Get the ID of the last message published on a channel
328 329 330 |
# File 'lib/message_bus/backends/postgres.rb', line 328 def last_id(channel) client.max_id(channel) end |
#last_ids(*channels) ⇒ Array<Integer>
Get the ID of the last message published on multiple channels
333 334 335 |
# File 'lib/message_bus/backends/postgres.rb', line 333 def last_ids(*channels) client.max_ids(*channels) end |
#publish(channel, data, opts = nil) ⇒ Integer
:queue_in_memory NOT SUPPORTED
Publishes a message to a channel
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 |
# File 'lib/message_bus/backends/postgres.rb', line 308 def publish(channel, data, opts = nil) # TODO in memory queue? c = client backlog_id = c.add(channel, data) msg = MessageBus::Message.new backlog_id, backlog_id, channel, data payload = msg.encode c.publish postgresql_channel_name, payload if backlog_id && backlog_id % clear_every == 0 max_backlog_size = (opts && opts[:max_backlog_size]) || self.max_backlog_size max_backlog_age = (opts && opts[:max_backlog_age]) || self.max_backlog_age c.clear_global_backlog(backlog_id, @max_global_backlog_size) c.expire(max_backlog_age) c.clear_channel_backlog(channel, backlog_id, max_backlog_size) end backlog_id end |
#reset! ⇒ Object
Deletes all message_bus data from the backend. Use with extreme caution.
292 293 294 |
# File 'lib/message_bus/backends/postgres.rb', line 292 def reset! client.reset! end |
#subscribe(channel, last_id = nil) {|message| ... } ⇒ nil
Subscribe to messages on a particular channel. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.
365 366 367 368 369 370 371 372 373 |
# File 'lib/message_bus/backends/postgres.rb', line 365 def subscribe(channel, last_id = nil) # trivial implementation for now, # can cut down on connections if we only have one global subscriber raise ArgumentError unless block_given? global_subscribe(last_id) do |m| yield m if m.channel == channel end end |