Class: Spacebunny::LiveStream::Amqp
- Defined in:
- lib/spacebunny/live_stream/amqp.rb
Constant Summary collapse
- DEFAULT_QUEUE_OPTIONS =
{ passive: true }
- DEFAULT_EXCHANGE_OPTIONS =
{ passive: true }
- ACK_TYPES =
[:manual, :auto]
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
Attributes inherited from Base
#api_endpoint, #auto_configs, #auto_connection_configs, #connection_configs, #custom_connection_configs, #host, #live_streams, #log_level, #log_to, #logger, #raise_on_error, #secret, #tls, #tls_ca_certificates, #tls_cert, #tls_key, #verify_peer, #vhost
Instance Method Summary collapse
- #connect ⇒ Object
- #disconnect ⇒ Object
-
#initialize(*args) ⇒ Amqp
constructor
A new instance of Amqp.
-
#message_from(name, options = {}, &block) ⇒ Object
Subscribe for messages coming from Live Stream with name ‘name’ Each subscriber will receive a copy of messages flowing through the Live Stream.
-
#message_from_cache(name, options = {}, &block) ⇒ Object
Subscribe for messages coming from cache of Live Stream with name ‘name’ The Live Stream will dispatch a message to the first ready subscriber in a round-robin fashion.
Methods inherited from Base
#auto_configure?, #connection_options=, #on_receive
Constructor Details
#initialize(*args) ⇒ Amqp
Returns a new instance of Amqp.
12 13 14 |
# File 'lib/spacebunny/live_stream/amqp.rb', line 12 def initialize(*args) super(:amqp, *args) end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
10 11 12 |
# File 'lib/spacebunny/live_stream/amqp.rb', line 10 def client @client end |
Instance Method Details
#connect ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/spacebunny/live_stream/amqp.rb', line 16 def connect # 'Fix' attributes: start from common connection configs and adjust attributes to match what Bunny # wants as connection args connection_params = connection_configs.dup connection_params[:user] = connection_params.delete :client connection_params[:password] = connection_params.delete :secret # Default on a tls connection unless connection_params[:tls] == false connection_params[:port] = connection_params.delete(:tls_port) end connection_params[:log_level] = connection_params.delete(:log_level) || ::Logger::ERROR # Re-create client every time connect is called @client = Bunny.new(connection_params) @client.start logger.info 'Connected to SpaceBunny' end |
#disconnect ⇒ Object
34 35 36 37 38 |
# File 'lib/spacebunny/live_stream/amqp.rb', line 34 def disconnect super client.stop if client logger.info 'Disconnected from SpaceBunny' end |
#message_from(name, options = {}, &block) ⇒ Object
Subscribe for messages coming from Live Stream with name ‘name’ Each subscriber will receive a copy of messages flowing through the Live Stream
42 43 44 |
# File 'lib/spacebunny/live_stream/amqp.rb', line 42 def (name, = {}, &block) name, , &block end |
#message_from_cache(name, options = {}, &block) ⇒ Object
Subscribe for messages coming from cache of Live Stream with name ‘name’ The Live Stream will dispatch a message to the first ready subscriber in a round-robin fashion.
48 49 50 51 |
# File 'lib/spacebunny/live_stream/amqp.rb', line 48 def (name, = {}, &block) [:from_cache] = true name, , &block end |