Class: Bunny::Exchange09
- Inherits:
-
Object
- Object
- Bunny::Exchange09
- Defined in:
- lib/bunny/exchange09.rb
Overview
Exchanges are the routing and distribution hub of AMQP. All messages that Bunny sends to an AMQP broker/server @have_to pass through an exchange in order to be routed to a destination queue. The AMQP specification defines the types of exchange that you can create.
At the time of writing there are four (4) types of exchange defined:
:direct
:fanout
:topic
:headers
AMQP-compliant brokers/servers are required to provide default exchanges for the direct
and
fanout
exchange types. All default exchanges are prefixed with 'amq.'
, for example:
amq.direct
amq.fanout
amq.topic
amq.match
oramq.headers
If you want more information about exchanges, please consult the documentation for your target broker/server or visit the AMQP website to find the version of the specification that applies to your target broker/server.
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#key ⇒ Object
readonly
Returns the value of attribute key.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#opts ⇒ Object
readonly
Returns the value of attribute opts.
-
#type ⇒ Object
readonly
Returns the value of attribute type.
Instance Method Summary collapse
-
#delete(opts = {}) ⇒ Symbol
Requests that an exchange is deleted from broker/server.
-
#initialize(client, name, opts = {}) ⇒ Exchange09
constructor
A new instance of Exchange09.
-
#publish(data, opts = {}) ⇒ NilClass
Publishes a message to a specific exchange.
Constructor Details
#initialize(client, name, opts = {}) ⇒ Exchange09
Returns a new instance of Exchange09.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/bunny/exchange09.rb', line 31 def initialize(client, name, opts = {}) # check connection to server raise Bunny::ConnectionError, 'Not connected to server' if client.status == :not_connected @client, @name, @opts = client, name, opts # set up the exchange type catering for default names if name =~ /^amq\.(.+)$/ predeclared = true new_type = $1 # handle 'amq.match' default new_type = 'headers' if new_type == 'match' @type = new_type.to_sym else @type = opts[:type] || :direct end @key = opts[:key] @client.exchanges[@name] ||= self # ignore the :nowait option if passed, otherwise program will hang waiting for a # response that will not be sent by the server opts.delete(:nowait) unless predeclared or name == '' opts = { :exchange => name, :type => type, :nowait => false, :deprecated_ticket => 0, :deprecated_auto_delete => false, :deprecated_internal => false }.merge(opts) client.send_frame(Qrack::Protocol09::Exchange::Declare.new(opts)) method = client.next_method client.check_response(method, Qrack::Protocol09::Exchange::DeclareOk, "Error declaring exchange #{name}: type = #{type}") end end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
29 30 31 |
# File 'lib/bunny/exchange09.rb', line 29 def client @client end |
#key ⇒ Object (readonly)
Returns the value of attribute key.
29 30 31 |
# File 'lib/bunny/exchange09.rb', line 29 def key @key end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
29 30 31 |
# File 'lib/bunny/exchange09.rb', line 29 def name @name end |
#opts ⇒ Object (readonly)
Returns the value of attribute opts.
29 30 31 |
# File 'lib/bunny/exchange09.rb', line 29 def opts @opts end |
#type ⇒ Object (readonly)
Returns the value of attribute type.
29 30 31 |
# File 'lib/bunny/exchange09.rb', line 29 def type @type end |
Instance Method Details
#delete(opts = {}) ⇒ Symbol
Requests that an exchange is deleted from broker/server. Removes reference from exchanges if successful. If an error occurs raises ProtocolError.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/bunny/exchange09.rb', line 79 def delete(opts = {}) # ignore the :nowait option if passed, otherwise program will hang waiting for a # response that will not be sent by the server opts.delete(:nowait) opts = { :exchange => name, :nowait => false, :deprecated_ticket => 0 }.merge(opts) client.send_frame(Qrack::Protocol09::Exchange::Delete.new(opts)) method = client.next_method client.check_response(method, Qrack::Protocol09::Exchange::DeleteOk, "Error deleting exchange #{name}") client.exchanges.delete(name) # return confirmation :delete_ok end |
#publish(data, opts = {}) ⇒ NilClass
Publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/bunny/exchange09.rb', line 127 def publish(data, opts = {}) opts = opts.dup out = [] # Set up options routing_key = opts.delete(:key) || key mandatory = opts.delete(:mandatory) immediate = opts.delete(:immediate) delivery_mode = opts.delete(:persistent) ? 2 : 1 content_type = opts.delete(:content_type) || 'application/octet-stream' out << Qrack::Protocol09::Basic::Publish.new({ :exchange => name, :routing_key => routing_key, :mandatory => mandatory, :immediate => immediate, :deprecated_ticket => 0 }) data = data.to_s out << Qrack::Protocol09::Header.new( Qrack::Protocol09::Basic, data.bytesize, { :content_type => content_type, :delivery_mode => delivery_mode, :priority => 0 }.merge(opts) ) out << Qrack::Transport09::Body.new(data) client.send_frame(*out) end |