Class: MarchHare::Exchange
- Inherits:
-
Object
- Object
- MarchHare::Exchange
- Defined in:
- lib/march_hare/exchange.rb
Overview
Represents AMQP 0.9.1 exchanges.
Instance Attribute Summary collapse
-
#channel ⇒ MarchHare::Channel
readonly
Channel this exchange object uses.
-
#name ⇒ String
readonly
Exchange name.
-
#type ⇒ Symbol
readonly
Type of this exchange (one of: :direct, :fanout, :topic, :headers).
Instance Method Summary collapse
-
#auto_delete? ⇒ Boolean
True if this exchange was declared as automatically deleted (deleted as soon as last consumer unbinds).
-
#bind(exchange, options = {}) ⇒ MarchHare::Exchange
Binds an exchange to another (source) exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.
- #declare! ⇒ Object
-
#delete(options = {}) ⇒ Object
Deletes the exchange unless it is predefined.
-
#durable? ⇒ Boolean
True if this exchange was declared as durable (will survive broker restart).
-
#initialize(channel, name, options = {}) ⇒ Exchange
constructor
Instantiates a new exchange.
-
#internal? ⇒ Boolean
True if this exchange is internal (used solely for exchange-to-exchange bindings and cannot be published to by clients).
-
#predefined? ⇒ Boolean
True if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).
-
#publish(body, opts = {}) ⇒ MarchHare::Exchange
Publishes a message.
- #recover_from_network_failure ⇒ Object
-
#unbind(exchange, opts = {}) ⇒ Bunny::Exchange
Unbinds an exchange from another (source) exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.
-
#wait_for_confirms ⇒ Object
Waits until all outstanding publisher confirms on the channel arrive.
Constructor Details
#initialize(channel, name, options = {}) ⇒ Exchange
Instantiates a new exchange.
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/march_hare/exchange.rb', line 39 def initialize(channel, name, = {}) raise ArgumentError, "exchange channel cannot be nil" if channel.nil? raise ArgumentError, "exchange name cannot be nil" if name.nil? raise ArgumentError, "exchange :type must be specified as an option" if [:type].nil? @channel = channel @name = name @type = [:type] @options = {:type => :fanout, :durable => false, :auto_delete => false, :internal => false, :passive => false}.merge() end |
Instance Attribute Details
#channel ⇒ MarchHare::Channel (readonly)
Returns Channel this exchange object uses.
15 16 17 |
# File 'lib/march_hare/exchange.rb', line 15 def channel @channel end |
#name ⇒ String (readonly)
Returns Exchange name.
13 14 15 |
# File 'lib/march_hare/exchange.rb', line 13 def name @name end |
#type ⇒ Symbol (readonly)
Type of this exchange (one of: :direct, :fanout, :topic, :headers).
19 20 21 |
# File 'lib/march_hare/exchange.rb', line 19 def type @type end |
Instance Method Details
#auto_delete? ⇒ Boolean
Returns true if this exchange was declared as automatically deleted (deleted as soon as last consumer unbinds).
148 149 150 |
# File 'lib/march_hare/exchange.rb', line 148 def auto_delete? !!@options[:auto_delete] end |
#bind(exchange, options = {}) ⇒ MarchHare::Exchange
Binds an exchange to another (source) exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.
111 112 113 114 |
# File 'lib/march_hare/exchange.rb', line 111 def bind(exchange, ={}) exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end @channel.exchange_bind(@name, exchange_name, .fetch(:routing_key, '')) end |
#declare! ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/march_hare/exchange.rb', line 174 def declare! unless predefined? if @options[:passive] then @channel.exchange_declare_passive(@name) else @channel.exchange_declare(@name, @options[:type].to_s, @options[:durable], @options[:auto_delete], @options[:internal], @options[:arguments]) end end end |
#delete(options = {}) ⇒ Object
Deletes the exchange unless it is predefined
92 93 94 95 |
# File 'lib/march_hare/exchange.rb', line 92 def delete(={}) @channel.deregister_exchange(self) @channel.exchange_delete(@name, .fetch(:if_unused, false)) unless predefined? end |
#durable? ⇒ Boolean
Returns true if this exchange was declared as durable (will survive broker restart).
142 143 144 |
# File 'lib/march_hare/exchange.rb', line 142 def durable? !!@options[:durable] end |
#internal? ⇒ Boolean
Returns true if this exchange is internal (used solely for exchange-to-exchange bindings and cannot be published to by clients).
154 155 156 |
# File 'lib/march_hare/exchange.rb', line 154 def internal? !!@options[:internal] end |
#predefined? ⇒ Boolean
Returns true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).
136 137 138 |
# File 'lib/march_hare/exchange.rb', line 136 def predefined? @name.empty? || @name.start_with?("amq.") end |
#publish(body, opts = {}) ⇒ MarchHare::Exchange
Publishes a message
75 76 77 78 79 80 81 82 |
# File 'lib/march_hare/exchange.rb', line 75 def publish(body, opts = {}) = {:routing_key => '', :mandatory => false}.merge(opts) @channel.basic_publish(@name, .delete(:routing_key), .delete(:mandatory), .fetch(:properties, ), body.to_java_bytes) end |
#recover_from_network_failure ⇒ Object
188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/march_hare/exchange.rb', line 188 def recover_from_network_failure # puts "Recovering exchange #{@name} from network failure" unless predefined? begin declare! @channel.register_exchange(self) rescue Exception => e # TODO: use a logger puts "Caught #{e.inspect} while redeclaring and registering exchange #{@name}!" end end end |
#unbind(exchange, opts = {}) ⇒ Bunny::Exchange
Unbinds an exchange from another (source) exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.
130 131 132 133 |
# File 'lib/march_hare/exchange.rb', line 130 def unbind(exchange, opts = {}) exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end @channel.exchange_unbind(@name, exchange_name, opts.fetch(:routing_key, ''), opts[:arguments]) end |
#wait_for_confirms ⇒ Object
Waits until all outstanding publisher confirms on the channel arrive.
This is a convenience method that delegates to Channel#wait_for_confirms
164 165 166 |
# File 'lib/march_hare/exchange.rb', line 164 def wait_for_confirms @channel.wait_for_confirms end |