Class: Spacebunny::LiveStream::Amqp

Inherits:
Base
  • Object
show all
Defined in:
lib/spacebunny/live_stream/amqp.rb

Constant Summary collapse

DEFAULT_QUEUE_OPTIONS =
{ passive: true }
DEFAULT_EXCHANGE_OPTIONS =
{ passive: true }
ACK_TYPES =
[:manual, :auto]

Instance Attribute Summary collapse

Attributes inherited from Base

#api_endpoint, #auto_configs, #auto_connection_configs, #connection_configs, #custom_connection_configs, #host, #live_streams, #log_level, #log_to, #logger, #raise_on_error, #secret, #tls, #tls_ca_certificates, #tls_cert, #tls_key, #verify_peer, #vhost

Instance Method Summary collapse

Methods inherited from Base

#auto_configure?, #connection_options=, #on_receive

Constructor Details

#initialize(*args) ⇒ Amqp

Returns a new instance of Amqp.



12
13
14
# File 'lib/spacebunny/live_stream/amqp.rb', line 12

def initialize(*args)
  super(:amqp, *args)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



10
11
12
# File 'lib/spacebunny/live_stream/amqp.rb', line 10

def client
  @client
end

Instance Method Details

#connectObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/spacebunny/live_stream/amqp.rb', line 16

def connect
  # 'Fix' attributes: start from common connection configs and adjust attributes to match what Bunny
  # wants as connection args
  connection_params = connection_configs.dup
  connection_params[:user] = connection_params.delete :client
  connection_params[:password] = connection_params.delete :secret
  # Default on a tls connection
  unless connection_params[:tls] == false
    connection_params[:port] = connection_params.delete(:tls_port)
  end
  connection_params[:log_level] = connection_params.delete(:log_level) || ::Logger::ERROR

  # Re-create client every time connect is called
  @client = Bunny.new(connection_params)
  @client.start
  logger.info 'Connected to SpaceBunny'
end

#disconnectObject



34
35
36
37
38
# File 'lib/spacebunny/live_stream/amqp.rb', line 34

def disconnect
  super
  client.stop if client
  logger.info 'Disconnected from SpaceBunny'
end

#message_from(name, options = {}, &block) ⇒ Object

Subscribe for messages coming from Live Stream with name ‘name’ Each subscriber will receive a copy of messages flowing through the Live Stream



42
43
44
# File 'lib/spacebunny/live_stream/amqp.rb', line 42

def message_from(name, options = {}, &block)
  receive_message_from name, options, &block
end

#message_from_cache(name, options = {}, &block) ⇒ Object

Subscribe for messages coming from cache of Live Stream with name ‘name’ The Live Stream will dispatch a message to the first ready subscriber in a round-robin fashion.



48
49
50
51
# File 'lib/spacebunny/live_stream/amqp.rb', line 48

def message_from_cache(name, options = {}, &block)
  options[:from_cache] = true
  receive_message_from name, options, &block
end