Class: GorgonBunny::Exchange

Inherits:
Object
  • Object
show all
Includes:
Compatibility
Defined in:
lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb

Overview

Represents AMQP 0.9.1 exchanges.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Compatibility

#channel_from

Constructor Details

#initialize(channel_or_connection, type, name, opts = {}) ⇒ Exchange

Returns a new instance of Exchange.

Parameters:

  • channel_or_connection (GorgonBunny::Channel)

    Channel this exchange will use. Session instances are supported only for backwards compatibility with 0.8.

  • type (Symbol, String)

    Exchange type

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange properties

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should this exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should this exchange be automatically deleted when it is no longer used?

  • :arguments (Boolean) — default: {}

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

See Also:



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 78

def initialize(channel_or_connection, type, name, opts = {})
  # old GorgonBunny versions pass a connection here. In that case,
  # we just use default channel from it. MK.
  @channel          = channel_from(channel_or_connection)
  @name             = name
  @type             = type
  @options          = self.class.add_default_options(name, opts)

  @durable          = @options[:durable]
  @auto_delete      = @options[:auto_delete]
  @arguments        = @options[:arguments]

  declare! unless opts[:no_declare] || predeclared? || (@name == GorgonAMQ::Protocol::EMPTY_STRING)

  @channel.register_exchange(self)
end

Instance Attribute Details

#channelGorgonBunny::Channel (readonly)



18
19
20
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 18

def channel
  @channel
end

#nameString (readonly)

Returns:

  • (String)


21
22
23
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 21

def name
  @name
end

#optsHash

Options hash this exchange instance was instantiated with

Returns:



33
34
35
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 33

def opts
  @opts
end

#statusSymbol (readonly)

Returns:

  • (Symbol)


29
30
31
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 29

def status
  @status
end

#typeSymbol (readonly)

Type of this exchange (one of: :direct, :fanout, :topic, :headers).

Returns:

  • (Symbol)


25
26
27
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 25

def type
  @type
end

Class Method Details

.default(channel_or_connection) ⇒ Exchange

Note:

Do not confuse default exchange with amq.direct: amq.direct is a pre-defined direct exchange that doesn’t have any special routing semantics.

The default exchange. Default exchange is a direct exchange that is predefined. It cannot be removed. Every queue is bind to this (direct) exchange by default with the following routing semantics: messages will be routed to the queue withe same same name as message’s routing key. In other words, if a message is published with a routing key of “weather.usa.ca.sandiego” and there is a queue Q with this name, that message will be routed to Q.

Examples:

Publishing a messages to the tasks queue

channel     = GorgonBunny::Channel.new(connection)
tasks_queue = channel.queue("tasks")
GorgonBunny::Exchange.default(channel).publish("make clean", routing_key => "tasks")

Parameters:

Returns:

  • (Exchange)

    An instance that corresponds to the default exchange (of type direct).

See Also:



57
58
59
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 57

def self.default(channel_or_connection)
  self.new(channel_from(channel_or_connection), :direct, GorgonAMQ::Protocol::EMPTY_STRING, :no_declare => true)
end

Instance Method Details

#argumentsHash

Returns Additional optional arguments (typically used by RabbitMQ extensions and plugins).

Returns:

  • (Hash)

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)



109
110
111
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 109

def arguments
  @arguments
end

#auto_delete?Boolean

Returns true if this exchange was declared as automatically deleted (deleted as soon as last consumer unbinds).

Returns:

  • (Boolean)

    true if this exchange was declared as automatically deleted (deleted as soon as last consumer unbinds).



103
104
105
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 103

def auto_delete?
  @auto_delete
end

#bind(source, opts = {}) ⇒ GorgonBunny::Exchange

Binds an exchange to another (source) exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • source (String)

    Source exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

See Also:



171
172
173
174
175
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 171

def bind(source, opts = {})
  @channel.exchange_bind(source, self, opts)

  self
end

#delete(opts = {}) ⇒ Object

Deletes the exchange unless it is predeclared

Parameters:

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this exchange be deleted only if it is no longer used

See Also:



152
153
154
155
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 152

def delete(opts = {})
  @channel.deregister_exchange(self)
  @channel.exchange_delete(@name, opts) unless predeclared?
end

#durable?Boolean

Returns true if this exchange was declared as durable (will survive broker restart).

Returns:

  • (Boolean)

    true if this exchange was declared as durable (will survive broker restart).



97
98
99
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 97

def durable?
  @durable
end

#handle_return(basic_return, properties, content) ⇒ Object



228
229
230
231
232
233
234
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 228

def handle_return(basic_return, properties, content)
  if @on_return
    @on_return.call(basic_return, properties, content)
  else
    # TODO: log a warning
  end
end

#on_return(&block) ⇒ Object

Defines a block that will handle returned messages



200
201
202
203
204
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 200

def on_return(&block)
  @on_return = block

  self
end

#predefined?Boolean Also known as: predeclared?

Returns true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).

Returns:

  • (Boolean)

    true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)



237
238
239
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 237

def predefined?
  (@name == GorgonAMQ::Protocol::EMPTY_STRING) || !!(@name =~ /^amq\.(direct|fanout|topic|headers|match)/i)
end

#publish(payload, opts = {}) ⇒ GorgonBunny::Exchange

Publishes a message

Parameters:

  • payload (String)

    Message payload. It will never be modified by GorgonBunny or RabbitMQ in any way.

  • opts (Hash) (defaults to: {})

    Message properties (metadata) and delivery settings

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :message_id (String)

    Any message identifier

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID

Returns:

See Also:



137
138
139
140
141
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 137

def publish(payload, opts = {})
  @channel.basic_publish(payload, self.name, (opts.delete(:routing_key) || opts.delete(:key)), opts)

  self
end

#recover_from_network_failureObject



217
218
219
220
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 217

def recover_from_network_failure
  # puts "Recovering exchange #{@name} from network failure"
  declare! unless predefined?
end

#unbind(source, opts = {}) ⇒ GorgonBunny::Exchange

Unbinds an exchange from another (source) exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • source (String)

    Source exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

See Also:



191
192
193
194
195
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 191

def unbind(source, opts = {})
  @channel.exchange_unbind(source, self, opts)

  self
end

#wait_for_confirmsObject

Waits until all outstanding publisher confirms on the channel arrive.

This is a convenience method that delegates to Channel#wait_for_confirms



212
213
214
# File 'lib/gorgon_bunny/lib/gorgon_bunny/exchange.rb', line 212

def wait_for_confirms
  @channel.wait_for_confirms
end