Module: MessageBus::Implementation

Included in:
MessageBus, Instance
Defined in:
lib/message_bus.rb

Overview

The main server-side interface to a message bus for the purposes of configuration, publishing and subscribing

Defined Under Namespace

Classes: Synchronizer

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#configHash<Symbol => Object> (readonly)

Returns Configuration options hash.

Returns:

  • (Hash<Symbol => Object>)

    Configuration options hash


31
32
33
# File 'lib/message_bus.rb', line 31

def config
  @config
end

Instance Method Details

#after_forkvoid

This method returns an undefined value.

Performs routines that are necessary after a process fork, typically

triggered by a forking webserver. Performs whatever the backend requires
and ensures the server is listening for publications and running
scheduled tasks.
[View source]

560
561
562
563
564
565
# File 'lib/message_bus.rb', line 560

def after_fork
  backend_instance.after_fork
  ensure_subscriber_thread
  # will ensure timer is running
  timer.queue {}
end

#allow_broadcast=(val) ⇒ void

This method returns an undefined value.

Parameters:

  • val (Boolean)

    whether or not to allow broadcasting (debugging)

[View source]

261
262
263
# File 'lib/message_bus.rb', line 261

def allow_broadcast=(val)
  configure(allow_broadcast: val)
end

#allow_broadcast?Boolean

Returns whether or not broadcasting is allowed. If not explicitly set, defaults to false unless we’re in Rails test or development mode.

Returns:

  • (Boolean)

    whether or not broadcasting is allowed. If not explicitly set, defaults to false unless we’re in Rails test or development mode.

[View source]

267
268
269
270
271
272
273
274
# File 'lib/message_bus.rb', line 267

def allow_broadcast?
  @config[:allow_broadcast] ||=
    if defined? ::Rails.env
      ::Rails.env.test? || ::Rails.env.development?
    else
      false
    end
end

#backendSymbol

Returns the name of the backend implementation configured.

Returns:

  • (Symbol)

    the name of the backend implementation configured

[View source]

322
323
324
# File 'lib/message_bus.rb', line 322

def backend
  @config[:backend] || :redis
end

#backend_instanceMessageBus::Backend::Base

Returns the configured backend. If not explicitly set, will be loaded based on the configuration provided.

Returns:

  • (MessageBus::Backend::Base)

    the configured backend. If not explicitly set, will be loaded based on the configuration provided.

[View source]

300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/message_bus.rb', line 300

def backend_instance
  @mutex.synchronize do
    return nil if @destroyed

    # Make sure logger is loaded before config is
    # passed to backend.
    logger

    @config[:backend_instance] ||= begin
      @config[:backend_options] ||= {}
      require "message_bus/backends/#{backend}"
      MessageBus::BACKENDS[backend].new @config
    end
  end
end

#backend_instance=(backend_instance) ⇒ void

This method returns an undefined value.

Parameters:

  • backend_instance (MessageBus::Backend::Base)

    A configured backend

[View source]

289
290
291
# File 'lib/message_bus.rb', line 289

def backend_instance=(backend_instance)
  configure(backend_instance: backend_instance)
end

#backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Array<MessageBus::Message>

Get messages from a channel backlog since the last ID specified, filtered by site

Parameters:

  • channel (String) (defaults to: nil)

    the name of the channel in question

  • last_id (#to_i) (defaults to: nil)

    the channel-specific ID of the last message that the caller received on the specified channel

  • site_id (String) (defaults to: nil)

    the ID of the site by which to filter

Returns:

  • (Array<MessageBus::Message>)

    all messages published to the specified channel since the specified last ID

[View source]

486
487
488
489
490
491
492
493
494
495
496
497
498
# File 'lib/message_bus.rb', line 486

def backlog(channel = nil, last_id = nil, site_id = nil)
  old =
    if channel
      backend_instance.backlog(encode_channel_name(channel, site_id), last_id)
    else
      backend_instance.global_backlog(last_id)
    end

  old.each do |m|
    decode_message!(m)
  end
  old
end

#base_routeString

Returns the route that message bus will respond to. If not explicitly set, defaults to “/”. Requests to “##base_routemessage-bus/*” will be handled by the message bus server.

Returns:

  • (String)

    the route that message bus will respond to. If not explicitly set, defaults to “/”. Requests to “##base_routemessage-bus/*” will be handled by the message bus server.

[View source]

153
154
155
# File 'lib/message_bus.rb', line 153

def base_route
  @config[:base_route] || "/"
end

#base_route=(route) ⇒ void

This method returns an undefined value.

Parameters:

  • route (String)

    Message bus will listen to requests on this route.

[View source]

146
147
148
# File 'lib/message_bus.rb', line 146

def base_route=(route)
  configure(base_route: route.gsub(Regexp.new('\A(?!/)|(?<!/)\Z|//+'), "/"))
end

#blocking_subscribe(channel = nil) {|message| ... } ⇒ void

This method returns an undefined value.

Subscribe to messages. Each message will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.

Parameters:

  • channel (String, nil) (defaults to: nil)

    the name of the channel to which we should subscribe. If ‘nil`, messages on every channel will be provided.

Yields:

  • (message)

    a message-handler block

Yield Parameters:

[View source]

406
407
408
409
410
411
412
# File 'lib/message_bus.rb', line 406

def blocking_subscribe(channel = nil, &blk)
  if channel
    backend_instance.subscribe(encode_channel_name(channel), &blk)
  else
    backend_instance.global_subscribe(&blk)
  end
end

#chunked_encoding_enabled=(val) ⇒ void

This method returns an undefined value.

Parameters:

  • val (Boolean)

    whether or not to enable chunked encoding

[View source]

74
75
76
# File 'lib/message_bus.rb', line 74

def chunked_encoding_enabled=(val)
  configure(chunked_encoding_enabled: val)
end

#chunked_encoding_enabled?Boolean

Returns whether or not chunked encoding is enabled. If not explicitly set, defaults to true.

Returns:

  • (Boolean)

    whether or not chunked encoding is enabled. If not explicitly set, defaults to true.

[View source]

68
69
70
# File 'lib/message_bus.rb', line 68

def chunked_encoding_enabled?
  @config[:chunked_encoding_enabled] == false ? false : true
end

#client_message_filtersArray

Returns a hash of message filters that have been registered

Returns:

  • (Array)

    returns a hash of message filters that have been registered

[View source]

620
621
622
623
# File 'lib/message_bus.rb', line 620

def client_message_filters
  configure(client_message_filters: []) if !@config[:client_message_filters]
  @config[:client_message_filters]
end

#configure(config) ⇒ void

This method returns an undefined value.

Overrides existing configuration

Parameters:

  • config (Hash<Symbol => Object>)

    values to merge into existing config

[View source]

179
180
181
# File 'lib/message_bus.rb', line 179

def configure(config)
  @config.merge!(config)
end

#destroyvoid

This method returns an undefined value.

Stops listening for publications and stops executing scheduled tasks. Mostly used in tests to destroy entire bus.

[View source]

539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
# File 'lib/message_bus.rb', line 539

def destroy
  return if @destroyed

  backend_instance.global_unsubscribe
  backend_instance.destroy

  @mutex.synchronize do
    return if @destroyed

    @subscriptions ||= {}
    @destroyed = true
  end
  @subscriber_thread.join if @subscriber_thread
  timer.stop
end

#extra_response_headers_lookup {|env| ... } ⇒ void

This method returns an undefined value.

Yields:

  • (env)

    a routine to determine extra headers to be set on a subscriber response

Yield Parameters:

  • env (Rack::Request::Env)

    the subscriber request environment

Yield Returns:

  • (Hash<String => String>)

    the extra headers to set on the response

[View source]

244
245
246
247
# File 'lib/message_bus.rb', line 244

def extra_response_headers_lookup(&blk)
  configure(extra_response_headers_lookup: blk) if blk
  @config[:extra_response_headers_lookup]
end

#global_backlog(last_id = nil) ⇒ Array<MessageBus::Message>

Get messages from the global backlog since the last ID specified

Parameters:

  • last_id (#to_i) (defaults to: nil)

    the global ID of the last message that the caller received

Returns:

  • (Array<MessageBus::Message>)

    all messages published on any channel since the specified last ID

[View source]

475
476
477
# File 'lib/message_bus.rb', line 475

def global_backlog(last_id = nil)
  backlog(nil, last_id)
end

#group_ids_lookup {|env| ... } ⇒ void

This method returns an undefined value.

Yields:

  • (env)

    a routine to determine the group IDs for a subscriber

Yield Parameters:

  • env (optional, Rack::Request::Env)

    the subscriber request environment

Yield Returns:

  • (optional, Array<String,Integer>)

    the group IDs for the subscriber

[View source]

216
217
218
219
# File 'lib/message_bus.rb', line 216

def group_ids_lookup(&blk)
  configure(group_ids_lookup: blk) if blk
  @config[:group_ids_lookup]
end

#initializeObject

[View source]

38
39
40
41
42
43
44
45
46
# File 'lib/message_bus.rb', line 38

def initialize
  @config = {}
  @mutex = Synchronizer.new
  @off = false
  @off_disable_publish = false
  @destroyed = false
  @timer_thread = nil
  @subscriber_thread = nil
end

#is_admin_lookup {|env| ... } ⇒ void

This method returns an undefined value.

Yields:

  • (env)

    a routine to determine if a request comes from an admin user

Yield Parameters:

  • env (Rack::Request::Env)

    the subscriber request environment

Yield Returns:

  • (Boolean)

    whether or not the request is from an admin user

[View source]

225
226
227
228
# File 'lib/message_bus.rb', line 225

def is_admin_lookup(&blk)
  configure(is_admin_lookup: blk) if blk
  @config[:is_admin_lookup]
end

#keepalive_intervalInteger

Returns the keepalive interval in seconds. If not explicitly set, defaults to ‘60`.

Returns:

  • (Integer)

    the keepalive interval in seconds. If not explicitly set, defaults to ‘60`.

[View source]

601
602
603
# File 'lib/message_bus.rb', line 601

def keepalive_interval
  @config[:keepalive_interval] || 60
end

#keepalive_interval=(interval) ⇒ Object

Parameters:

  • interval (Integer)

    the keepalive interval in seconds. Set to 0 to disable; anything higher and a keepalive will run every N seconds. If it fails, the process is killed.

[View source]

595
596
597
# File 'lib/message_bus.rb', line 595

def keepalive_interval=(interval)
  configure(keepalive_interval: interval)
end

#last_id(channel, site_id = nil) ⇒ Integer

Get the ID of the last message published on a channel, filtered by site

Parameters:

  • channel (String)

    the name of the channel in question

  • site_id (String) (defaults to: nil)

    the ID of the site by which to filter

Returns:

  • (Integer)

    the channel-specific ID of the last message published to the given channel

[View source]

506
507
508
# File 'lib/message_bus.rb', line 506

def last_id(channel, site_id = nil)
  backend_instance.last_id(encode_channel_name(channel, site_id))
end

#last_ids(*channels, site_id: nil) ⇒ Hash

Get the ID of the last message published on multiple channels

Parameters:

  • channels (Array<String>)
    • array of channels to fetch

  • site_id (String) (defaults to: nil)
    • the ID of the site by which to filter

Returns:

  • (Hash)

    the channel-specific IDs of the last message published to each requested channel

[View source]

516
517
518
519
520
# File 'lib/message_bus.rb', line 516

def last_ids(*channels, site_id: nil)
  encoded_channel_names = channels.map { |c| encode_channel_name(c, site_id) }
  ids = backend_instance.last_ids(*encoded_channel_names)
  channels.zip(ids).to_h
end

#last_message(channel) ⇒ MessageBus::Message

Get the last message published on a channel

Parameters:

  • channel (String)

    the name of the channel in question

Returns:

[View source]

527
528
529
530
531
532
533
534
# File 'lib/message_bus.rb', line 527

def last_message(channel)
  if last_id = last_id(channel)
    messages = backlog(channel, last_id - 1)
    if messages
      messages[0]
    end
  end
end

#listening?Boolean

Returns whether or not the server is actively listening for publications on the bus.

Returns:

  • (Boolean)

    whether or not the server is actively listening for publications on the bus

[View source]

569
570
571
# File 'lib/message_bus.rb', line 569

def listening?
  @subscriber_thread&.alive?
end

#local_subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc

Subscribe to messages on a particular channel, filtered by the current site (@see #site_id_lookup). Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will not block, but instead the callbacks will be executed asynchronously in a dedicated subscriber thread.

Parameters:

  • channel (String) (defaults to: nil)

    the name of the channel to which we should subscribe

  • last_id (#to_i) (defaults to: -1,)

    the channel-specific ID of the last message that the caller received on the specified channel

Yields:

  • (message)

    a message-handler block

Yield Parameters:

Returns:

  • (Proc)

    the callback block that will be executed

[View source]

443
444
445
446
# File 'lib/message_bus.rb', line 443

def local_subscribe(channel = nil, last_id = -1, &blk)
  site_id = site_id_lookup.call if site_id_lookup && !global?(channel)
  subscribe_impl(channel, site_id, last_id, &blk)
end

#local_unsubscribe(channel = nil, &blk) ⇒ void

This method returns an undefined value.

Removes a subscription to a particular channel, filtered by the current site (@see #site_id_lookup).

Parameters:

  • channel (String) (defaults to: nil)

    the name of the channel from which we should unsubscribe

  • blk (Proc, nil)

    the callback which should be removed. If ‘nil`, removes all.

[View source]

465
466
467
468
# File 'lib/message_bus.rb', line 465

def local_unsubscribe(channel = nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup
  unsubscribe_impl(channel, site_id, &blk)
end

#loggerLogger

Returns the logger used by the bus. If not explicitly set, is configured to log to STDOUT at INFO level.

Returns:

  • (Logger)

    the logger used by the bus. If not explicitly set, is configured to log to STDOUT at INFO level.

[View source]

56
57
58
59
60
61
62
63
64
# File 'lib/message_bus.rb', line 56

def logger
  return @config[:logger] if @config[:logger]

  require 'logger'
  logger = Logger.new(STDOUT)
  logger.level = Logger::INFO
  configure(logger: logger)
  logger
end

#logger=(logger) ⇒ void

This method returns an undefined value.

Parameters:

  • logger (Logger)

    a logger object to be used by the bus

[View source]

50
51
52
# File 'lib/message_bus.rb', line 50

def logger=(logger)
  configure(logger: logger)
end

#long_polling_enabled=(val) ⇒ void

This method returns an undefined value.

Parameters:

  • val (Boolean)

    whether or not to enable long polling

[View source]

86
87
88
# File 'lib/message_bus.rb', line 86

def long_polling_enabled=(val)
  configure(long_polling_enabled: val)
end

#long_polling_enabled?Boolean

Returns whether or not long polling is enabled. If not explicitly set, defaults to true.

Returns:

  • (Boolean)

    whether or not long polling is enabled. If not explicitly set, defaults to true.

[View source]

80
81
82
# File 'lib/message_bus.rb', line 80

def long_polling_enabled?
  @config[:long_polling_enabled] == false ? false : true
end

#long_polling_intervalInteger

Returns the long-polling interval in milliseconds. If not explicitly set, defaults to 25,000.

Returns:

  • (Integer)

    the long-polling interval in milliseconds. If not explicitly set, defaults to 25,000.

[View source]

140
141
142
# File 'lib/message_bus.rb', line 140

def long_polling_interval
  @config[:long_polling_interval] || 25 * 1000
end

#long_polling_interval=(millisecs) ⇒ void

This method returns an undefined value.

Parameters:

  • millisecs (Integer)

    the long-polling interval in milliseconds

[View source]

134
135
136
# File 'lib/message_bus.rb', line 134

def long_polling_interval=(millisecs)
  configure(long_polling_interval: millisecs)
end

#max_active_clientsInteger

Returns The number of simultaneous clients we can service; will revert to polling if we are out of slots. Defaults to 1000 if not explicitly set.

Returns:

  • (Integer)

    The number of simultaneous clients we can service; will revert to polling if we are out of slots. Defaults to 1000 if not explicitly set.

[View source]

100
101
102
# File 'lib/message_bus.rb', line 100

def max_active_clients
  @config[:max_active_clients] || 1000
end

#max_active_clients=(val) ⇒ void

This method returns an undefined value.

Parameters:

  • val (Integer)

    The number of simultaneous clients we can service; will revert to polling if we are out of slots

[View source]

93
94
95
# File 'lib/message_bus.rb', line 93

def max_active_clients=(val)
  configure(max_active_clients: val)
end

#off(disable_publish: true) ⇒ void

This method returns an undefined value.

Disables publication to the bus

Parameters:

  • disable_publish (Boolean) (defaults to: true)

    Whether or not to disable publishing

[View source]

165
166
167
168
# File 'lib/message_bus.rb', line 165

def off(disable_publish: true)
  @off = true
  @off_disable_publish = disable_publish
end

#off?Boolean

Returns whether the bus is disabled or not.

Returns:

  • (Boolean)

    whether the bus is disabled or not

[View source]

158
159
160
# File 'lib/message_bus.rb', line 158

def off?
  @off
end

#onvoid

This method returns an undefined value.

Enables publication to the bus

[View source]

172
173
174
# File 'lib/message_bus.rb', line 172

def on
  @destroyed = @off = @off_disable_publish = false
end

#on_connect(&blk) ⇒ Object

[View source]

249
250
251
252
# File 'lib/message_bus.rb', line 249

def on_connect(&blk)
  configure(on_connect: blk) if blk
  @config[:on_connect]
end

#on_disconnect(&blk) ⇒ Object

[View source]

254
255
256
257
# File 'lib/message_bus.rb', line 254

def on_disconnect(&blk)
  configure(on_disconnect: blk) if blk
  @config[:on_disconnect]
end

#on_middleware_error {|env, e| ... } ⇒ void

This method returns an undefined value.

Yields:

  • (env, e)

    a routine to handle exceptions raised when handling a subscriber request

Yield Parameters:

  • env (Rack::Request::Env)

    the subscriber request environment

  • e (Exception)

    the exception that was raised

Yield Returns:

  • (optional, Array<(Integer,Hash,Array)>)

    a Rack response to be delivered

[View source]

235
236
237
238
# File 'lib/message_bus.rb', line 235

def on_middleware_error(&blk)
  configure(on_middleware_error: blk) if blk
  @config[:on_middleware_error]
end

#publish(channel, data, opts = nil) ⇒ Integer

Publishes a message to a channel

Parameters:

  • channel (String)

    the name of the channel to which the message should be published

  • data (JSON)

    some data to publish to the channel. Must be an object that can be encoded as JSON

  • opts (Hash) (defaults to: nil)

Options Hash (opts):

  • :client_ids (Array<String>) — default: `nil`

    the unique client IDs to which the message should be available. If nil, available to all.

  • :user_ids (Array<String,Integer>) — default: `nil`

    the user IDs to which the message should be available. If nil, available to all.

  • :group_ids (Array<String,Integer>) — default: `nil`

    the group IDs to which the message should be available. If nil, available to all.

  • :site_id (String) — default: `nil`

    the site ID to scope the message to; used for hosting multiple applications or instances of an application against a single message_bus

  • :max_backlog_age (nil, Integer)

    the longest amount of time a message may live in a backlog before being removed, in seconds

  • :max_backlog_size (nil, Integer)

    the largest permitted size (number of messages) for the channel backlog; beyond this capacity, old messages will be dropped

Returns:

  • (Integer)

    the channel-specific ID the message was given

Raises:

[View source]

344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# File 'lib/message_bus.rb', line 344

def publish(channel, data, opts = nil)
  return if @off_disable_publish

  @mutex.synchronize do
    raise ::MessageBus::BusDestroyed if @destroyed
  end

  user_ids = nil
  group_ids = nil
  client_ids = nil

  site_id = nil
  if opts
    user_ids = opts[:user_ids]
    group_ids = opts[:group_ids]
    client_ids = opts[:client_ids]
    site_id = opts[:site_id]
  end

  if (user_ids || group_ids) && global?(channel)
    raise ::MessageBus::InvalidMessage
  end

  if (user_ids == []) || (group_ids == []) || (client_ids == [])
    raise ::MessageBus::InvalidMessageTarget
  end

  encoded_data = transport_codec.encode({
    "data" => data,
    "user_ids" => user_ids,
    "group_ids" => group_ids,
    "client_ids" => client_ids
  })

  channel_opts = {}

  if opts
    if ((age = opts[:max_backlog_age]) || (size = opts[:max_backlog_size]))
      channel_opts[:max_backlog_size] = size
      channel_opts[:max_backlog_age] = age
    end

    if opts.has_key?(:queue_in_memory)
      channel_opts[:queue_in_memory] = opts[:queue_in_memory]
    end
  end

  encoded_channel_name = encode_channel_name(channel, site_id)
  backend_instance.publish(encoded_channel_name, encoded_data, channel_opts)
end

#rack_hijack_enabled=(val) ⇒ void

This method returns an undefined value.

Parameters:

  • val (Boolean)

    whether or not to enable Rack Hijack

[View source]

128
129
130
# File 'lib/message_bus.rb', line 128

def rack_hijack_enabled=(val)
  configure(rack_hijack_enabled: val)
end

#rack_hijack_enabled?Boolean

Returns whether or not Rack Hijack is enabled. If not explicitly set, will default to true, unless we’re on Passenger without the ability to set the advertised_concurrency_level to 0.

Returns:

  • (Boolean)

    whether or not Rack Hijack is enabled. If not explicitly set, will default to true, unless we’re on Passenger without the ability to set the advertised_concurrency_level to 0.

[View source]

107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/message_bus.rb', line 107

def rack_hijack_enabled?
  if @config[:rack_hijack_enabled].nil?
    enable = true

    # without this switch passenger will explode
    # it will run out of connections after about 10
    if defined? PhusionPassenger
      enable = false
      if PhusionPassenger.respond_to? :advertised_concurrency_level
        PhusionPassenger.advertised_concurrency_level = 0
        enable = true
      end
    end
    configure(rack_hijack_enabled: enable)
  end

  @config[:rack_hijack_enabled]
end

#redis_configObject

[View source]

190
191
192
# File 'lib/message_bus.rb', line 190

def redis_config
  @config[:redis_config] || {}
end

#redis_config=(config) ⇒ void

This method returns an undefined value.

Overrides existing configuration, explicitly enabling the redis backend

Parameters:

  • config (Hash<Symbol => Object>)

    values to merge into existing config

[View source]

186
187
188
# File 'lib/message_bus.rb', line 186

def redis_config=(config)
  configure(backend: :redis, redis_config: config)
end

#register_client_message_filter(channel_prefix) {|message| ... } ⇒ void

This method returns an undefined value.

Registers a client message filter that allows messages to be filtered from the client.

Parameters:

  • channel_prefix (String, Regexp)

    channel prefix to match against a message’s channel

Yield Parameters:

Yield Returns:

  • (Boolean)

    whether the message should be published to the client

[View source]

612
613
614
615
616
617
# File 'lib/message_bus.rb', line 612

def register_client_message_filter(channel_prefix, &blk)
  if blk
    configure(client_message_filters: []) if !@config[:client_message_filters]
    @config[:client_message_filters] << [channel_prefix, blk]
  end
end

#reliable_pub_subObject

[View source]

316
317
318
319
# File 'lib/message_bus.rb', line 316

def reliable_pub_sub
  logger.warn "MessageBus.reliable_pub_sub is deprecated, use MessageBus.backend_instance instead."
  backend_instance
end

#reliable_pub_sub=(pub_sub) ⇒ Object

[View source]

293
294
295
296
# File 'lib/message_bus.rb', line 293

def reliable_pub_sub=(pub_sub)
  logger.warn "MessageBus.reliable_pub_sub= is deprecated, use MessageBus.backend_instance= instead."
  self.backend_instance = pub_sub
end

#reset!Object

[View source]

574
575
576
# File 'lib/message_bus.rb', line 574

def reset!
  backend_instance.reset! if backend_instance
end

#site_id_lookup {|env| ... } ⇒ void

This method returns an undefined value.

Yields:

  • (env)

    a routine to determine the site ID for a subscriber

Yield Parameters:

  • env (optional, Rack::Request::Env)

    the subscriber request environment

Yield Returns:

  • (optional, String)

    the site ID for the subscriber

[View source]

198
199
200
201
# File 'lib/message_bus.rb', line 198

def site_id_lookup(&blk)
  configure(site_id_lookup: blk) if blk
  @config[:site_id_lookup]
end

#subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc

Subscribe to messages on a particular channel. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will not block, but instead the callbacks will be executed asynchronously in a dedicated subscriber thread.

Parameters:

  • channel (String) (defaults to: nil)

    the name of the channel to which we should subscribe

  • last_id (#to_i) (defaults to: -1,)

    the channel-specific ID of the last message that the caller received on the specified channel

Yields:

  • (message)

    a message-handler block

Yield Parameters:

Returns:

  • (Proc)

    the callback block that will be executed

[View source]

426
427
428
# File 'lib/message_bus.rb', line 426

def subscribe(channel = nil, last_id = -1, &blk)
  subscribe_impl(channel, nil, last_id, &blk)
end

#timerMessageBus::TimerThread

Returns the timer thread used for triggering scheduled routines at specific times/intervals.

Returns:

  • (MessageBus::TimerThread)

    the timer thread used for triggering scheduled routines at specific times/intervals.

[View source]

580
581
582
583
584
585
586
587
588
589
590
# File 'lib/message_bus.rb', line 580

def timer
  return @timer_thread if @timer_thread

  @timer_thread ||= begin
    t = MessageBus::TimerThread.new
    t.on_error do |e|
      logger.warn "Failed to process job: #{e} #{e.backtrace}"
    end
    t
  end
end

#transport_codecMessageBus::Codec::Base

Returns codec used to encode and decode Message payloads.

Returns:

[View source]

283
284
285
# File 'lib/message_bus.rb', line 283

def transport_codec
  @config[:transport_codec] ||= MessageBus::Codec::Json.new
end

#transport_codec=(codec) ⇒ void

This method returns an undefined value.

Parameters:

[View source]

278
279
280
# File 'lib/message_bus.rb', line 278

def transport_codec=(codec)
  configure(transport_codec: codec)
end

#unsubscribe(channel = nil, &blk) ⇒ void

This method returns an undefined value.

Removes a subscription to a particular channel.

Parameters:

  • channel (String) (defaults to: nil)

    the name of the channel from which we should unsubscribe

  • blk (Proc, nil)

    the callback which should be removed. If ‘nil`, removes all.

[View source]

454
455
456
# File 'lib/message_bus.rb', line 454

def unsubscribe(channel = nil, &blk)
  unsubscribe_impl(channel, nil, &blk)
end

#user_id_lookup {|env| ... } ⇒ void

This method returns an undefined value.

Yields:

  • (env)

    a routine to determine the user ID for a subscriber (authenticate)

Yield Parameters:

  • env (optional, Rack::Request::Env)

    the subscriber request environment

Yield Returns:

  • (optional, String, Integer)

    the user ID for the subscriber

[View source]

207
208
209
210
# File 'lib/message_bus.rb', line 207

def user_id_lookup(&blk)
  configure(user_id_lookup: blk) if blk
  @config[:user_id_lookup]
end