Class: Msgr::Connection
- Inherits:
-
Object
- Object
- Msgr::Connection
- Includes:
- Logging
- Defined in:
- lib/msgr/connection.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Instance Method Summary collapse
- #bind(routes) ⇒ Object
- #bindings ⇒ Object
- #channel(prefetch: 1) ⇒ Object
- #close ⇒ Object
- #connect ⇒ Object
- #connection ⇒ Object
- #delete ⇒ Object
- #exchange ⇒ Object
-
#initialize(uri, config, dispatcher) ⇒ Connection
constructor
A new instance of Connection.
- #publish(message, opts = {}) ⇒ Object
- #purge(**kwargs) ⇒ Object
- #purge_queue(name) ⇒ Object
- #release ⇒ Object
- #running? ⇒ Boolean
Methods included from Logging
Constructor Details
#initialize(uri, config, dispatcher) ⇒ Connection
Returns a new instance of Connection.
11 12 13 14 15 16 |
# File 'lib/msgr/connection.rb', line 11 def initialize(uri, config, dispatcher) @uri = uri @config = config @dispatcher = dispatcher @channels = [] end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
9 10 11 |
# File 'lib/msgr/connection.rb', line 9 def config @config end |
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
9 10 11 |
# File 'lib/msgr/connection.rb', line 9 def uri @uri end |
Instance Method Details
#bind(routes) ⇒ Object
87 88 89 90 91 92 93 94 95 96 |
# File 'lib/msgr/connection.rb', line 87 def bind(routes) if routes.empty? log(:warn) do "No routes to bound to. Bind will have no effect:\n " \ "#{routes.inspect}" end else bind_all(routes) end end |
#bindings ⇒ Object
83 84 85 |
# File 'lib/msgr/connection.rb', line 83 def bindings @bindings ||= [] end |
#channel(prefetch: 1) ⇒ Object
38 39 40 41 42 43 |
# File 'lib/msgr/connection.rb', line 38 def channel(prefetch: 1) channel = Msgr::Channel.new(config, connection) channel.prefetch(prefetch) @channels << channel channel end |
#close ⇒ Object
98 99 100 101 102 |
# File 'lib/msgr/connection.rb', line 98 def close @channels.each(&:close) connection.close if @connection log(:debug) { 'Closed.' } end |
#connect ⇒ Object
34 35 36 |
# File 'lib/msgr/connection.rb', line 34 def connect connection end |
#connection ⇒ Object
30 31 32 |
# File 'lib/msgr/connection.rb', line 30 def connection @connection ||= ::Bunny.new(config).tap(&:start) end |
#delete ⇒ Object
57 58 59 60 61 62 63 |
# File 'lib/msgr/connection.rb', line 57 def delete return if bindings.empty? log(:debug) { "Delete bindings (#{bindings.size})..." } bindings.each(&:delete) end |
#exchange ⇒ Object
45 46 47 |
# File 'lib/msgr/connection.rb', line 45 def exchange @exchange ||= channel.exchange end |
#publish(message, opts = {}) ⇒ Object
22 23 24 25 26 27 28 |
# File 'lib/msgr/connection.rb', line 22 def publish(, opts = {}) opts[:routing_key] = opts.delete(:to) if opts[:to] exchange.publish .to_s, opts.merge(persistent: true) log(:debug) { "Published message to #{opts[:routing_key]}" } end |
#purge(**kwargs) ⇒ Object
65 66 67 68 69 70 71 |
# File 'lib/msgr/connection.rb', line 65 def purge(**kwargs) return if bindings.empty? log(:debug) { "Purge bindings (#{bindings.size})..." } bindings.each {|b| b.purge(**kwargs) } end |
#purge_queue(name) ⇒ Object
73 74 75 76 77 78 79 80 81 |
# File 'lib/msgr/connection.rb', line 73 def purge_queue(name) # Creating the queue in passive mode ensures that queues that do not exist # won't be created just to purge them. # That requires creating a new channel every time, as exceptions (on # missing queues) invalidate the channel. channel.queue(name, passive: true).purge rescue Bunny::NotFound nil end |
#release ⇒ Object
49 50 51 52 53 54 55 |
# File 'lib/msgr/connection.rb', line 49 def release return if bindings.empty? log(:debug) { "Release bindings (#{bindings.size})..." } bindings.each(&:release) end |
#running? ⇒ Boolean
18 19 20 |
# File 'lib/msgr/connection.rb', line 18 def running? bindings.any? end |