Module: MessageBus::Implementation
- Included in:
- MessageBus, Instance
- Defined in:
- lib/message_bus.rb
Overview
The main server-side interface to a message bus for the purposes of configuration, publishing and subscribing
Defined Under Namespace
Classes: Synchronizer
Instance Attribute Summary collapse
-
#config ⇒ Hash<Symbol => Object>
readonly
Configuration options hash.
Instance Method Summary collapse
-
#after_fork ⇒ void
Performs routines that are necessary after a process fork, typically triggered by a forking webserver.
- #allow_broadcast=(val) ⇒ void
-
#allow_broadcast? ⇒ Boolean
Whether or not broadcasting is allowed.
-
#backend ⇒ Symbol
The name of the backend implementation configured.
-
#backend_instance ⇒ MessageBus::Backend::Base
The configured backend.
- #backend_instance=(backend_instance) ⇒ void
-
#backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Array<MessageBus::Message>
Get messages from a channel backlog since the last ID specified, filtered by site.
-
#base_route ⇒ String
The route that message bus will respond to.
- #base_route=(route) ⇒ void
-
#blocking_subscribe(channel = nil) {|message| ... } ⇒ void
Subscribe to messages.
- #chunked_encoding_enabled=(val) ⇒ void
-
#chunked_encoding_enabled? ⇒ Boolean
Whether or not chunked encoding is enabled.
-
#client_message_filters ⇒ Array
Returns a hash of message filters that have been registered.
-
#configure(config) ⇒ void
Overrides existing configuration.
-
#destroy ⇒ void
Stops listening for publications and stops executing scheduled tasks.
- #extra_response_headers_lookup {|env| ... } ⇒ void
-
#global_backlog(last_id = nil) ⇒ Array<MessageBus::Message>
Get messages from the global backlog since the last ID specified.
- #group_ids_lookup {|env| ... } ⇒ void
- #initialize ⇒ Object
- #is_admin_lookup {|env| ... } ⇒ void
-
#keepalive_interval ⇒ Integer
The keepalive interval in seconds.
- #keepalive_interval=(interval) ⇒ Object
-
#last_id(channel, site_id = nil) ⇒ Integer
Get the ID of the last message published on a channel, filtered by site.
-
#last_ids(*channels, site_id: nil) ⇒ Hash
Get the ID of the last message published on multiple channels.
-
#last_message(channel) ⇒ MessageBus::Message
Get the last message published on a channel.
-
#listening? ⇒ Boolean
Whether or not the server is actively listening for publications on the bus.
-
#local_subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc
Subscribe to messages on a particular channel, filtered by the current site (@see #site_id_lookup).
-
#local_unsubscribe(channel = nil, &blk) ⇒ void
Removes a subscription to a particular channel, filtered by the current site (@see #site_id_lookup).
-
#logger ⇒ Logger
The logger used by the bus.
- #logger=(logger) ⇒ void
- #long_polling_enabled=(val) ⇒ void
-
#long_polling_enabled? ⇒ Boolean
Whether or not long polling is enabled.
-
#long_polling_interval ⇒ Integer
The long-polling interval in milliseconds.
- #long_polling_interval=(millisecs) ⇒ void
-
#max_active_clients ⇒ Integer
The number of simultaneous clients we can service; will revert to polling if we are out of slots.
- #max_active_clients=(val) ⇒ void
-
#off(disable_publish: true) ⇒ void
Disables publication to the bus.
-
#off? ⇒ Boolean
Whether the bus is disabled or not.
-
#on ⇒ void
Enables publication to the bus.
- #on_connect(&blk) ⇒ Object
- #on_disconnect(&blk) ⇒ Object
- #on_middleware_error {|env, e| ... } ⇒ void
-
#publish(channel, data, opts = nil) ⇒ Integer
Publishes a message to a channel.
- #rack_hijack_enabled=(val) ⇒ void
-
#rack_hijack_enabled? ⇒ Boolean
Whether or not Rack Hijack is enabled.
- #redis_config ⇒ Object
-
#redis_config=(config) ⇒ void
Overrides existing configuration, explicitly enabling the redis backend.
-
#register_client_message_filter(channel_prefix) {|message| ... } ⇒ void
Registers a client message filter that allows messages to be filtered from the client.
- #reliable_pub_sub ⇒ Object
- #reliable_pub_sub=(pub_sub) ⇒ Object
- #reset! ⇒ Object
- #site_id_lookup {|env| ... } ⇒ void
-
#subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc
Subscribe to messages on a particular channel.
-
#timer ⇒ MessageBus::TimerThread
The timer thread used for triggering scheduled routines at specific times/intervals.
-
#transport_codec ⇒ MessageBus::Codec::Base
Codec used to encode and decode Message payloads.
- #transport_codec=(codec) ⇒ void
-
#unsubscribe(channel = nil, &blk) ⇒ void
Removes a subscription to a particular channel.
- #user_id_lookup {|env| ... } ⇒ void
Instance Attribute Details
permalink #config ⇒ Hash<Symbol => Object> (readonly)
Returns Configuration options hash.
31 32 33 |
# File 'lib/message_bus.rb', line 31 def config @config end |
Instance Method Details
permalink #after_fork ⇒ void
This method returns an undefined value.
Performs routines that are necessary after a process fork, typically
triggered by a forking webserver. Performs whatever the backend requires
and ensures the server is listening for publications and running
scheduled tasks.
560 561 562 563 564 565 |
# File 'lib/message_bus.rb', line 560 def after_fork backend_instance.after_fork ensure_subscriber_thread # will ensure timer is running timer.queue {} end |
permalink #allow_broadcast=(val) ⇒ void
This method returns an undefined value.
261 262 263 |
# File 'lib/message_bus.rb', line 261 def allow_broadcast=(val) configure(allow_broadcast: val) end |
permalink #allow_broadcast? ⇒ Boolean
Returns whether or not broadcasting is allowed. If not explicitly set, defaults to false unless we’re in Rails test or development mode.
267 268 269 270 271 272 273 274 |
# File 'lib/message_bus.rb', line 267 def allow_broadcast? @config[:allow_broadcast] ||= if defined? ::Rails.env ::Rails.env.test? || ::Rails.env.development? else false end end |
permalink #backend ⇒ Symbol
Returns the name of the backend implementation configured.
322 323 324 |
# File 'lib/message_bus.rb', line 322 def backend @config[:backend] || :redis end |
permalink #backend_instance ⇒ MessageBus::Backend::Base
Returns the configured backend. If not explicitly set, will be loaded based on the configuration provided.
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/message_bus.rb', line 300 def backend_instance @mutex.synchronize do return nil if @destroyed # Make sure logger is loaded before config is # passed to backend. logger @config[:backend_instance] ||= begin @config[:backend_options] ||= {} require "message_bus/backends/#{backend}" MessageBus::BACKENDS[backend].new @config end end end |
permalink #backend_instance=(backend_instance) ⇒ void
This method returns an undefined value.
289 290 291 |
# File 'lib/message_bus.rb', line 289 def backend_instance=(backend_instance) configure(backend_instance: backend_instance) end |
permalink #backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Array<MessageBus::Message>
Get messages from a channel backlog since the last ID specified, filtered by site
486 487 488 489 490 491 492 493 494 495 496 497 498 |
# File 'lib/message_bus.rb', line 486 def backlog(channel = nil, last_id = nil, site_id = nil) old = if channel backend_instance.backlog(encode_channel_name(channel, site_id), last_id) else backend_instance.global_backlog(last_id) end old.each do |m| (m) end old end |
permalink #base_route ⇒ String
Returns the route that message bus will respond to. If not explicitly set, defaults to “/”. Requests to “##base_routemessage-bus/*” will be handled by the message bus server.
153 154 155 |
# File 'lib/message_bus.rb', line 153 def base_route @config[:base_route] || "/" end |
permalink #base_route=(route) ⇒ void
This method returns an undefined value.
146 147 148 |
# File 'lib/message_bus.rb', line 146 def base_route=(route) configure(base_route: route.gsub(Regexp.new('\A(?!/)|(?<!/)\Z|//+'), "/")) end |
permalink #blocking_subscribe(channel = nil) {|message| ... } ⇒ void
This method returns an undefined value.
Subscribe to messages. Each message will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.
406 407 408 409 410 411 412 |
# File 'lib/message_bus.rb', line 406 def blocking_subscribe(channel = nil, &blk) if channel backend_instance.subscribe(encode_channel_name(channel), &blk) else backend_instance.global_subscribe(&blk) end end |
permalink #chunked_encoding_enabled=(val) ⇒ void
This method returns an undefined value.
74 75 76 |
# File 'lib/message_bus.rb', line 74 def chunked_encoding_enabled=(val) configure(chunked_encoding_enabled: val) end |
permalink #chunked_encoding_enabled? ⇒ Boolean
Returns whether or not chunked encoding is enabled. If not explicitly set, defaults to true.
68 69 70 |
# File 'lib/message_bus.rb', line 68 def chunked_encoding_enabled? @config[:chunked_encoding_enabled] == false ? false : true end |
permalink #client_message_filters ⇒ Array
Returns a hash of message filters that have been registered
620 621 622 623 |
# File 'lib/message_bus.rb', line 620 def configure(client_message_filters: []) if !@config[:client_message_filters] @config[:client_message_filters] end |
permalink #configure(config) ⇒ void
This method returns an undefined value.
Overrides existing configuration
179 180 181 |
# File 'lib/message_bus.rb', line 179 def configure(config) @config.merge!(config) end |
permalink #destroy ⇒ void
This method returns an undefined value.
Stops listening for publications and stops executing scheduled tasks. Mostly used in tests to destroy entire bus.
539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 |
# File 'lib/message_bus.rb', line 539 def destroy return if @destroyed backend_instance.global_unsubscribe backend_instance.destroy @mutex.synchronize do return if @destroyed @subscriptions ||= {} @destroyed = true end @subscriber_thread.join if @subscriber_thread timer.stop end |
permalink #extra_response_headers_lookup {|env| ... } ⇒ void
This method returns an undefined value.
244 245 246 247 |
# File 'lib/message_bus.rb', line 244 def extra_response_headers_lookup(&blk) configure(extra_response_headers_lookup: blk) if blk @config[:extra_response_headers_lookup] end |
permalink #global_backlog(last_id = nil) ⇒ Array<MessageBus::Message>
Get messages from the global backlog since the last ID specified
475 476 477 |
# File 'lib/message_bus.rb', line 475 def global_backlog(last_id = nil) backlog(nil, last_id) end |
permalink #group_ids_lookup {|env| ... } ⇒ void
This method returns an undefined value.
216 217 218 219 |
# File 'lib/message_bus.rb', line 216 def group_ids_lookup(&blk) configure(group_ids_lookup: blk) if blk @config[:group_ids_lookup] end |
permalink #initialize ⇒ Object
[View source]
38 39 40 41 42 43 44 45 46 |
# File 'lib/message_bus.rb', line 38 def initialize @config = {} @mutex = Synchronizer.new @off = false @off_disable_publish = false @destroyed = false @timer_thread = nil @subscriber_thread = nil end |
permalink #is_admin_lookup {|env| ... } ⇒ void
This method returns an undefined value.
225 226 227 228 |
# File 'lib/message_bus.rb', line 225 def is_admin_lookup(&blk) configure(is_admin_lookup: blk) if blk @config[:is_admin_lookup] end |
permalink #keepalive_interval ⇒ Integer
Returns the keepalive interval in seconds. If not explicitly set, defaults to ‘60`.
601 602 603 |
# File 'lib/message_bus.rb', line 601 def keepalive_interval @config[:keepalive_interval] || 60 end |
permalink #keepalive_interval=(interval) ⇒ Object
595 596 597 |
# File 'lib/message_bus.rb', line 595 def keepalive_interval=(interval) configure(keepalive_interval: interval) end |
permalink #last_id(channel, site_id = nil) ⇒ Integer
Get the ID of the last message published on a channel, filtered by site
506 507 508 |
# File 'lib/message_bus.rb', line 506 def last_id(channel, site_id = nil) backend_instance.last_id(encode_channel_name(channel, site_id)) end |
permalink #last_ids(*channels, site_id: nil) ⇒ Hash
Get the ID of the last message published on multiple channels
516 517 518 519 520 |
# File 'lib/message_bus.rb', line 516 def last_ids(*channels, site_id: nil) encoded_channel_names = channels.map { |c| encode_channel_name(c, site_id) } ids = backend_instance.last_ids(*encoded_channel_names) channels.zip(ids).to_h end |
permalink #last_message(channel) ⇒ MessageBus::Message
Get the last message published on a channel
527 528 529 530 531 532 533 534 |
# File 'lib/message_bus.rb', line 527 def (channel) if last_id = last_id(channel) = backlog(channel, last_id - 1) if [0] end end end |
permalink #listening? ⇒ Boolean
Returns whether or not the server is actively listening for publications on the bus.
569 570 571 |
# File 'lib/message_bus.rb', line 569 def listening? @subscriber_thread&.alive? end |
permalink #local_subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc
Subscribe to messages on a particular channel, filtered by the current site (@see #site_id_lookup). Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will not block, but instead the callbacks will be executed asynchronously in a dedicated subscriber thread.
443 444 445 446 |
# File 'lib/message_bus.rb', line 443 def local_subscribe(channel = nil, last_id = -1, &blk) site_id = site_id_lookup.call if site_id_lookup && !global?(channel) subscribe_impl(channel, site_id, last_id, &blk) end |
permalink #local_unsubscribe(channel = nil, &blk) ⇒ void
This method returns an undefined value.
Removes a subscription to a particular channel, filtered by the current site (@see #site_id_lookup).
465 466 467 468 |
# File 'lib/message_bus.rb', line 465 def local_unsubscribe(channel = nil, &blk) site_id = site_id_lookup.call if site_id_lookup unsubscribe_impl(channel, site_id, &blk) end |
permalink #logger ⇒ Logger
Returns the logger used by the bus. If not explicitly set, is configured to log to STDOUT at INFO level.
56 57 58 59 60 61 62 63 64 |
# File 'lib/message_bus.rb', line 56 def logger return @config[:logger] if @config[:logger] require 'logger' logger = Logger.new(STDOUT) logger.level = Logger::INFO configure(logger: logger) logger end |
permalink #logger=(logger) ⇒ void
This method returns an undefined value.
50 51 52 |
# File 'lib/message_bus.rb', line 50 def logger=(logger) configure(logger: logger) end |
permalink #long_polling_enabled=(val) ⇒ void
This method returns an undefined value.
86 87 88 |
# File 'lib/message_bus.rb', line 86 def long_polling_enabled=(val) configure(long_polling_enabled: val) end |
permalink #long_polling_enabled? ⇒ Boolean
Returns whether or not long polling is enabled. If not explicitly set, defaults to true.
80 81 82 |
# File 'lib/message_bus.rb', line 80 def long_polling_enabled? @config[:long_polling_enabled] == false ? false : true end |
permalink #long_polling_interval ⇒ Integer
Returns the long-polling interval in milliseconds. If not explicitly set, defaults to 25,000.
140 141 142 |
# File 'lib/message_bus.rb', line 140 def long_polling_interval @config[:long_polling_interval] || 25 * 1000 end |
permalink #long_polling_interval=(millisecs) ⇒ void
This method returns an undefined value.
134 135 136 |
# File 'lib/message_bus.rb', line 134 def long_polling_interval=(millisecs) configure(long_polling_interval: millisecs) end |
permalink #max_active_clients ⇒ Integer
Returns The number of simultaneous clients we can service; will revert to polling if we are out of slots. Defaults to 1000 if not explicitly set.
100 101 102 |
# File 'lib/message_bus.rb', line 100 def max_active_clients @config[:max_active_clients] || 1000 end |
permalink #max_active_clients=(val) ⇒ void
This method returns an undefined value.
93 94 95 |
# File 'lib/message_bus.rb', line 93 def max_active_clients=(val) configure(max_active_clients: val) end |
permalink #off(disable_publish: true) ⇒ void
This method returns an undefined value.
Disables publication to the bus
165 166 167 168 |
# File 'lib/message_bus.rb', line 165 def off(disable_publish: true) @off = true @off_disable_publish = disable_publish end |
permalink #off? ⇒ Boolean
Returns whether the bus is disabled or not.
158 159 160 |
# File 'lib/message_bus.rb', line 158 def off? @off end |
permalink #on ⇒ void
This method returns an undefined value.
Enables publication to the bus
172 173 174 |
# File 'lib/message_bus.rb', line 172 def on @destroyed = @off = @off_disable_publish = false end |
permalink #on_connect(&blk) ⇒ Object
[View source]
249 250 251 252 |
# File 'lib/message_bus.rb', line 249 def on_connect(&blk) configure(on_connect: blk) if blk @config[:on_connect] end |
permalink #on_disconnect(&blk) ⇒ Object
[View source]
254 255 256 257 |
# File 'lib/message_bus.rb', line 254 def on_disconnect(&blk) configure(on_disconnect: blk) if blk @config[:on_disconnect] end |
permalink #on_middleware_error {|env, e| ... } ⇒ void
This method returns an undefined value.
235 236 237 238 |
# File 'lib/message_bus.rb', line 235 def on_middleware_error(&blk) configure(on_middleware_error: blk) if blk @config[:on_middleware_error] end |
permalink #publish(channel, data, opts = nil) ⇒ Integer
Publishes a message to a channel
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
# File 'lib/message_bus.rb', line 344 def publish(channel, data, opts = nil) return if @off_disable_publish @mutex.synchronize do raise ::MessageBus::BusDestroyed if @destroyed end user_ids = nil group_ids = nil client_ids = nil site_id = nil if opts user_ids = opts[:user_ids] group_ids = opts[:group_ids] client_ids = opts[:client_ids] site_id = opts[:site_id] end if (user_ids || group_ids) && global?(channel) raise ::MessageBus::InvalidMessage end if (user_ids == []) || (group_ids == []) || (client_ids == []) raise ::MessageBus::InvalidMessageTarget end encoded_data = transport_codec.encode({ "data" => data, "user_ids" => user_ids, "group_ids" => group_ids, "client_ids" => client_ids }) channel_opts = {} if opts if ((age = opts[:max_backlog_age]) || (size = opts[:max_backlog_size])) channel_opts[:max_backlog_size] = size channel_opts[:max_backlog_age] = age end if opts.has_key?(:queue_in_memory) channel_opts[:queue_in_memory] = opts[:queue_in_memory] end end encoded_channel_name = encode_channel_name(channel, site_id) backend_instance.publish(encoded_channel_name, encoded_data, channel_opts) end |
permalink #rack_hijack_enabled=(val) ⇒ void
This method returns an undefined value.
128 129 130 |
# File 'lib/message_bus.rb', line 128 def rack_hijack_enabled=(val) configure(rack_hijack_enabled: val) end |
permalink #rack_hijack_enabled? ⇒ Boolean
Returns whether or not Rack Hijack is enabled. If not explicitly set, will default to true, unless we’re on Passenger without the ability to set the advertised_concurrency_level to 0.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/message_bus.rb', line 107 def rack_hijack_enabled? if @config[:rack_hijack_enabled].nil? enable = true # without this switch passenger will explode # it will run out of connections after about 10 if defined? PhusionPassenger enable = false if PhusionPassenger.respond_to? :advertised_concurrency_level PhusionPassenger.advertised_concurrency_level = 0 enable = true end end configure(rack_hijack_enabled: enable) end @config[:rack_hijack_enabled] end |
permalink #redis_config ⇒ Object
[View source]
190 191 192 |
# File 'lib/message_bus.rb', line 190 def redis_config @config[:redis_config] || {} end |
permalink #redis_config=(config) ⇒ void
This method returns an undefined value.
Overrides existing configuration, explicitly enabling the redis backend
186 187 188 |
# File 'lib/message_bus.rb', line 186 def redis_config=(config) configure(backend: :redis, redis_config: config) end |
permalink #register_client_message_filter(channel_prefix) {|message| ... } ⇒ void
This method returns an undefined value.
Registers a client message filter that allows messages to be filtered from the client.
612 613 614 615 616 617 |
# File 'lib/message_bus.rb', line 612 def (channel_prefix, &blk) if blk configure(client_message_filters: []) if !@config[:client_message_filters] @config[:client_message_filters] << [channel_prefix, blk] end end |
permalink #reliable_pub_sub ⇒ Object
[View source]
316 317 318 319 |
# File 'lib/message_bus.rb', line 316 def reliable_pub_sub logger.warn "MessageBus.reliable_pub_sub is deprecated, use MessageBus.backend_instance instead." backend_instance end |
permalink #reliable_pub_sub=(pub_sub) ⇒ Object
[View source]
293 294 295 296 |
# File 'lib/message_bus.rb', line 293 def reliable_pub_sub=(pub_sub) logger.warn "MessageBus.reliable_pub_sub= is deprecated, use MessageBus.backend_instance= instead." self.backend_instance = pub_sub end |
permalink #reset! ⇒ Object
[View source]
574 575 576 |
# File 'lib/message_bus.rb', line 574 def reset! backend_instance.reset! if backend_instance end |
permalink #site_id_lookup {|env| ... } ⇒ void
This method returns an undefined value.
198 199 200 201 |
# File 'lib/message_bus.rb', line 198 def site_id_lookup(&blk) configure(site_id_lookup: blk) if blk @config[:site_id_lookup] end |
permalink #subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc
Subscribe to messages on a particular channel. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will not block, but instead the callbacks will be executed asynchronously in a dedicated subscriber thread.
426 427 428 |
# File 'lib/message_bus.rb', line 426 def subscribe(channel = nil, last_id = -1, &blk) subscribe_impl(channel, nil, last_id, &blk) end |
permalink #timer ⇒ MessageBus::TimerThread
Returns the timer thread used for triggering scheduled routines at specific times/intervals.
580 581 582 583 584 585 586 587 588 589 590 |
# File 'lib/message_bus.rb', line 580 def timer return @timer_thread if @timer_thread @timer_thread ||= begin t = MessageBus::TimerThread.new t.on_error do |e| logger.warn "Failed to process job: #{e} #{e.backtrace}" end t end end |
permalink #transport_codec ⇒ MessageBus::Codec::Base
Returns codec used to encode and decode Message payloads.
283 284 285 |
# File 'lib/message_bus.rb', line 283 def transport_codec @config[:transport_codec] ||= MessageBus::Codec::Json.new end |
permalink #transport_codec=(codec) ⇒ void
This method returns an undefined value.
278 279 280 |
# File 'lib/message_bus.rb', line 278 def transport_codec=(codec) configure(transport_codec: codec) end |
permalink #unsubscribe(channel = nil, &blk) ⇒ void
This method returns an undefined value.
Removes a subscription to a particular channel.
454 455 456 |
# File 'lib/message_bus.rb', line 454 def unsubscribe(channel = nil, &blk) unsubscribe_impl(channel, nil, &blk) end |
permalink #user_id_lookup {|env| ... } ⇒ void
This method returns an undefined value.
207 208 209 210 |
# File 'lib/message_bus.rb', line 207 def user_id_lookup(&blk) configure(user_id_lookup: blk) if blk @config[:user_id_lookup] end |