Class: MarchHare::Exchange

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/exchange.rb

Overview

Represents AMQP 0.9.1 exchanges.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, name, options = {}) ⇒ Exchange

Instantiates a new exchange.


39
40
41
42
43
44
45
46
47
48
# File 'lib/march_hare/exchange.rb', line 39

def initialize(channel, name, options = {})
  raise ArgumentError, "exchange channel cannot be nil" if channel.nil?
  raise ArgumentError, "exchange name cannot be nil" if name.nil?
  raise ArgumentError, "exchange :type must be specified as an option" if options[:type].nil?

  @channel = channel
  @name    = name
  @type    = options[:type]
  @options = {:type => :fanout, :durable => false, :auto_delete => false, :internal => false, :passive => false}.merge(options)
end

Instance Attribute Details

#channelMarchHare::Channel (readonly)


15
16
17
# File 'lib/march_hare/exchange.rb', line 15

def channel
  @channel
end

#nameString (readonly)


13
14
15
# File 'lib/march_hare/exchange.rb', line 13

def name
  @name
end

#typeSymbol (readonly)

Type of this exchange (one of: :direct, :fanout, :topic, :headers).


19
20
21
# File 'lib/march_hare/exchange.rb', line 19

def type
  @type
end

Instance Method Details

#auto_delete?Boolean

Returns true if this exchange was declared as automatically deleted (deleted as soon as last consumer unbinds).


148
149
150
# File 'lib/march_hare/exchange.rb', line 148

def auto_delete?
  !!@options[:auto_delete]
end

#bind(exchange, options = {}) ⇒ MarchHare::Exchange

Binds an exchange to another (source) exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.


111
112
113
114
# File 'lib/march_hare/exchange.rb', line 111

def bind(exchange, options={})
  exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end
  @channel.exchange_bind(@name, exchange_name, options.fetch(:routing_key, ''))
end

#delete(options = {}) ⇒ Object

Deletes the exchange unless it is predefined


92
93
94
95
# File 'lib/march_hare/exchange.rb', line 92

def delete(options={})
  @channel.deregister_exchange(self)
  @channel.exchange_delete(@name, options.fetch(:if_unused, false)) unless predefined?
end

#durable?Boolean

Returns true if this exchange was declared as durable (will survive broker restart).


142
143
144
# File 'lib/march_hare/exchange.rb', line 142

def durable?
  !!@options[:durable]
end

#internal?Boolean


154
155
156
# File 'lib/march_hare/exchange.rb', line 154

def internal?
  !!@options[:internal]
end

#predefined?Boolean


136
137
138
# File 'lib/march_hare/exchange.rb', line 136

def predefined?
  @name.empty? || @name.start_with?("amq.")
end

#publish(body, opts = {}) ⇒ MarchHare::Exchange

Publishes a message

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :properties (Hash)

    Messages and delivery properties

    • :timestamp (Time) A timestamp associated with this message
    • :expiration (Integer) Expiration time after which the message will be deleted
    • :type (String) Message type, e.g. what type of event or command this message represents. Can be any string
    • :reply_to (String) Queue name other apps should send the response to
    • :content_type (String) Message content type (e.g. application/json)
    • :content_encoding (String) Message content encoding (e.g. gzip)
    • :correlation_id (String) Message correlated to this one, e.g. what request this message is a reply for
    • :priority (Integer) Message priority, 0 to 9. Not used by RabbitMQ, only applications
    • :message_id (String) Any message identifier
    • :user_id (String) Optional user ID. Verified by RabbitMQ against the actual connection username
    • :app_id (String) Optional application ID

See Also:


75
76
77
78
79
80
81
82
# File 'lib/march_hare/exchange.rb', line 75

def publish(body, opts = {})
  options = {:routing_key => '', :mandatory => false}.merge(opts)
  @channel.basic_publish(@name,
                         options.delete(:routing_key),
                         options.delete(:mandatory),
                         options.fetch(:properties, options),
                         body.to_java_bytes)
end

#unbind(exchange, opts = {}) ⇒ Bunny::Exchange

Unbinds an exchange from another (source) exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

See Also:


130
131
132
133
# File 'lib/march_hare/exchange.rb', line 130

def unbind(exchange, opts = {})
  exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end
  @channel.exchange_unbind(@name, exchange_name, opts.fetch(:routing_key, ''), opts[:arguments])
end

#wait_for_confirmsObject

Waits until all outstanding publisher confirms on the channel arrive.

This is a convenience method that delegates to Channel#wait_for_confirms


164
165
166
# File 'lib/march_hare/exchange.rb', line 164

def wait_for_confirms
  @channel.wait_for_confirms
end