Class: Bunny::Exchange

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/exchange.rb

Overview

Represents AMQP 0.9.1 exchanges.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, type, name, opts = {}) ⇒ Exchange

Returns a new instance of Exchange.

Parameters:

  • channel (Bunny::Channel)

    Channel this exchange will use.

  • type (Symbol, String)

    Exchange type

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange properties

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should this exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should this exchange be automatically deleted when it is no longer used?

  • :arguments (Boolean) — default: {}

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

See Also:



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/bunny/exchange.rb', line 74

def initialize(channel, type, name, opts = {})
  @channel          = channel
  @name             = name
  @type             = type
  @options          = self.class.add_default_options(name, opts)

  @durable          = @options[:durable]
  @auto_delete      = @options[:auto_delete]
  @internal         = @options[:internal]
  @arguments        = @options[:arguments]

  @bindings         = Set.new

  declare! unless opts[:no_declare] || predeclared? || (@name == AMQ::Protocol::EMPTY_STRING)

  @channel.register_exchange(self)
end

Instance Attribute Details

#channelBunny::Channel (readonly)

Returns:



15
16
17
# File 'lib/bunny/exchange.rb', line 15

def channel
  @channel
end

#nameString (readonly)

Returns:

  • (String)


18
19
20
# File 'lib/bunny/exchange.rb', line 18

def name
  @name
end

#optsHash

Options hash this exchange instance was instantiated with

Returns:

  • (Hash)


30
31
32
# File 'lib/bunny/exchange.rb', line 30

def opts
  @opts
end

#statusSymbol (readonly)

Returns:

  • (Symbol)


26
27
28
# File 'lib/bunny/exchange.rb', line 26

def status
  @status
end

#typeSymbol (readonly)

Type of this exchange (one of: :direct, :fanout, :topic, :headers).

Returns:

  • (Symbol)


22
23
24
# File 'lib/bunny/exchange.rb', line 22

def type
  @type
end

Class Method Details

.default(channel_or_connection) ⇒ Exchange

Note:

Do not confuse the default exchange with amq.direct: amq.direct is a pre-defined direct exchange that doesn’t have any special routing semantics.

The default exchange. This exchange is a direct exchange that is predefined by the broker and that cannot be removed. Every queue is bound to this exchange by default with the following routing semantics: messages will be routed to the queue with the same name as the message’s routing key. In other words, if a message is published with a routing key of “weather.usa.ca.sandiego” and there is a queue with this name, the message will be routed to the queue.

Examples:

Publishing a messages to the tasks queue

channel     = Bunny::Channel.new(connection)
tasks_queue = channel.queue("tasks")
Bunny::Exchange.default(channel).publish("make clean", :routing_key => "tasks")

Parameters:

  • channel_or_connection (Bunny::Channel)

    Channel to use. Session instances are only supported for backwards compatibility.

Returns:

  • (Exchange)

    An instance that corresponds to the default exchange (of type direct).

See Also:



54
55
56
# File 'lib/bunny/exchange.rb', line 54

def self.default(channel_or_connection)
  self.new(channel_or_connection, :direct, AMQ::Protocol::EMPTY_STRING, :no_declare => true)
end

Instance Method Details

#argumentsHash

Returns Additional optional arguments (typically used by RabbitMQ extensions and plugins).

Returns:

  • (Hash)

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)



112
113
114
# File 'lib/bunny/exchange.rb', line 112

def arguments
  @arguments
end

#auto_delete?Boolean

Returns true if this exchange was declared as automatically deleted (deleted as soon as last consumer unbinds).

Returns:

  • (Boolean)

    true if this exchange was declared as automatically deleted (deleted as soon as last consumer unbinds).



100
101
102
# File 'lib/bunny/exchange.rb', line 100

def auto_delete?
  @auto_delete
end

#bind(source, opts = {}) ⇒ Bunny::Exchange

Binds an exchange to another (source) exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • source (String)

    Source exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

See Also:



174
175
176
177
178
179
# File 'lib/bunny/exchange.rb', line 174

def bind(source, opts = {})
  @channel.exchange_bind(source, self, opts)
  @bindings.add(source: source, opts: opts)

  self
end

#delete(opts = {}) ⇒ Object

Deletes the exchange unless it is predeclared

Parameters:

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this exchange be deleted only if it is no longer used

See Also:



155
156
157
158
# File 'lib/bunny/exchange.rb', line 155

def delete(opts = {})
  @channel.deregister_exchange(self)
  @channel.exchange_delete(@name, opts) unless predeclared?
end

#durable?Boolean

Returns true if this exchange was declared as durable (will survive broker restart).

Returns:

  • (Boolean)

    true if this exchange was declared as durable (will survive broker restart).



94
95
96
# File 'lib/bunny/exchange.rb', line 94

def durable?
  @durable
end

#handle_return(basic_return, properties, content) ⇒ Object



236
237
238
239
240
241
242
# File 'lib/bunny/exchange.rb', line 236

def handle_return(basic_return, properties, content)
  if @on_return
    @on_return.call(basic_return, properties, content)
  else
    # TODO: log a warning
  end
end

#internal?Boolean

Returns true if this exchange is internal (used solely for exchange-to-exchange bindings and cannot be published to by clients).

Returns:

  • (Boolean)

    true if this exchange is internal (used solely for exchange-to-exchange bindings and cannot be published to by clients)



106
107
108
# File 'lib/bunny/exchange.rb', line 106

def internal?
  @internal
end

#on_return(&block) ⇒ Object

Defines a block that will handle returned messages



205
206
207
208
209
# File 'lib/bunny/exchange.rb', line 205

def on_return(&block)
  @on_return = block

  self
end

#predefined?Boolean Also known as: predeclared?

Returns true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).

Returns:

  • (Boolean)

    true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)



245
246
247
# File 'lib/bunny/exchange.rb', line 245

def predefined?
  (@name == AMQ::Protocol::EMPTY_STRING) || !!(@name =~ /^amq\.(direct|fanout|topic|headers|match)/i)
end

#publish(payload, opts = {}) ⇒ Bunny::Exchange

Publishes a message

Parameters:

  • payload (String)

    Message payload. It will never be modified by Bunny or RabbitMQ in any way.

  • opts (Hash) (defaults to: {})

    Message properties (metadata) and delivery settings

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :message_id (String)

    Any message identifier

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID

Returns:

See Also:



140
141
142
143
144
# File 'lib/bunny/exchange.rb', line 140

def publish(payload, opts = {})
  @channel.basic_publish(payload, self.name, (opts.delete(:routing_key) || opts.delete(:key)), opts)

  self
end

#recover_from_network_failureObject



222
223
224
225
226
227
228
# File 'lib/bunny/exchange.rb', line 222

def recover_from_network_failure
  declare! unless @options[:no_declare] ||predefined?

  @bindings.each do |b|
    bind(b[:source], b[:opts])
  end
end

#unbind(source, opts = {}) ⇒ Bunny::Exchange

Unbinds an exchange from another (source) exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • source (String)

    Source exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

See Also:



195
196
197
198
199
200
# File 'lib/bunny/exchange.rb', line 195

def unbind(source, opts = {})
  @channel.exchange_unbind(source, self, opts)
  @bindings.delete(source: source, opts: opts)

  self
end

#wait_for_confirmsObject

Waits until all outstanding publisher confirms on the channel arrive.

This is a convenience method that delegates to Channel#wait_for_confirms



217
218
219
# File 'lib/bunny/exchange.rb', line 217

def wait_for_confirms
  @channel.wait_for_confirms
end