Class: MarchHare::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/queue.rb

Overview

Represents AMQP 0.9.1 queue.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, name, options = {}) ⇒ Queue

Returns a new instance of Queue


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/march_hare/queue.rb', line 31

def initialize(channel, name, options={})
  raise ArgumentError, 'queue name must be a string' unless name.is_a? String

  @channel = channel
  @name = name
  @options = {:durable => false, :exclusive => false, :auto_delete => false, :passive => false, :arguments => Hash.new}.merge(options)

  @durable      = @options[:durable]
  @exclusive    = @options[:exclusive]
  @server_named = @name.empty?
  @auto_delete  = @options[:auto_delete]
  @arguments    = @options[:arguments]

  @bindings     = Set.new
end

Instance Attribute Details

#channelMarchHare::Channel (readonly)


15
16
17
# File 'lib/march_hare/queue.rb', line 15

def channel
  @channel
end

#nameString (readonly)


17
18
19
# File 'lib/march_hare/queue.rb', line 17

def name
  @name
end

Instance Method Details

#argumentsHash


73
74
75
# File 'lib/march_hare/queue.rb', line 73

def arguments
  @arguments
end

#auto_delete?Boolean

Returns true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).


62
63
64
# File 'lib/march_hare/queue.rb', line 62

def auto_delete?
  @auto_delete
end

#bind(exchange, options = {}) ⇒ Object

Binds queue to an exchange

Options Hash (options):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:


89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/march_hare/queue.rb', line 89

def bind(exchange, options={})
  exchange_name = if exchange.respond_to?(:name) then
                    exchange.name
                  else
                    exchange.to_s
                  end
  @channel.queue_bind(@name, exchange_name, (options[:routing_key] || options[:key] || ""), options[:arguments])

  # store bindings for automatic recovery. We need to be very careful to
  # not cause an infinite rebinding loop here when we recover. MK.
  binding = { :exchange => exchange_name, :routing_key => (options[:routing_key] || options[:key]), :arguments => options[:arguments] }
  @bindings << binding unless @bindings.include?(binding)

  self
end

#build_consumer(opts = {}, &block) ⇒ Object


157
158
159
160
161
162
163
# File 'lib/march_hare/queue.rb', line 157

def build_consumer(opts = {}, &block)
  if opts[:block] || opts[:blocking]
    BlockingCallbackConsumer.new(@channel, self, opts[:buffer_size], opts, block)
  else
    CallbackConsumer.new(@channel, self, opts, block)
  end
end

#consumer_countInteger


208
209
210
211
# File 'lib/march_hare/queue.rb', line 208

def consumer_count
  response = @channel.queue_declare_passive(@name)
  response.consumer_count
end

#delete(if_unused = false, if_empty = false) ⇒ Object

Deletes the queue


135
136
137
# File 'lib/march_hare/queue.rb', line 135

def delete(if_unused = false, if_empty = false)
  @channel.queue_delete(@name, if_unused, if_empty)
end

#durable?Boolean

Returns true if this queue was declared as durable (will survive broker restart).


50
51
52
# File 'lib/march_hare/queue.rb', line 50

def durable?
  @durable
end

#exclusive?Boolean

Returns true if this queue was declared as exclusive (limited to just one consumer)


56
57
58
# File 'lib/march_hare/queue.rb', line 56

def exclusive?
  @exclusive
end

#get(options = {}) ⇒ Object Also known as: pop


146
147
148
149
150
151
152
153
154
# File 'lib/march_hare/queue.rb', line 146

def get(options = {})
  response = @channel.basic_get(@name, !options.fetch(:ack, false))

  if response
    [Headers.new(@channel, nil, response.envelope, response.props), String.from_java_bytes(response.body)]
  else
    nil
  end
end

#message_countInteger


202
203
204
205
# File 'lib/march_hare/queue.rb', line 202

def message_count
  response = @channel.queue_declare_passive(@name)
  response.message_count
end

#predefined?Boolean


230
231
232
# File 'lib/march_hare/queue.rb', line 230

def predefined?
  @name.start_with?("amq.")
end

#publish(payload, opts = {}) ⇒ Object

Publishes a message to the queue via default exchange. Takes the same arguments as Exchange#publish


218
219
220
221
222
# File 'lib/march_hare/queue.rb', line 218

def publish(payload, opts = {})
  @channel.default_exchange.publish(payload, opts.merge(:routing_key => @name))

  self
end

#purgeObject

Purges a queue (removes all messages from it)


142
143
144
# File 'lib/march_hare/queue.rb', line 142

def purge
  @channel.queue_purge(@name)
end

#server_named?Boolean

Returns true if this queue was declared as server named.


68
69
70
# File 'lib/march_hare/queue.rb', line 68

def server_named?
  @server_named
end

#statusArray<Integer>

Returns A pair with information about the number of queue messages and consumers


196
197
198
199
# File 'lib/march_hare/queue.rb', line 196

def status
  response = @channel.queue_declare_passive(@name)
  [response.message_count, response.consumer_count]
end

#subscribe(opts = {}, &block) ⇒ Object

Adds a consumer to the queue (subscribes for message deliveries).

Options Hash (opts):

  • :manual_ack (Boolean) — default: false

    Will this consumer use manual acknowledgements?

  • :exclusive (Boolean) — default: false

    Should this consumer be exclusive for this queue?

  • :block (Boolean) — default: false

    Should the call block calling thread?

  • :on_cancellation (#call)

    Block to execute when this consumer is cancelled remotely (e.g. via the RabbitMQ Management plugin)

  • :consumer_tag (String)

    Unique consumer identifier. It is usually recommended to let MarchHare generate it for you.

  • :arguments (Hash) — default: {}

    Additional (optional) arguments, typically used by RabbitMQ extensions

See Also:


178
179
180
# File 'lib/march_hare/queue.rb', line 178

def subscribe(opts = {}, &block)
  subscribe_with(build_consumer(opts, &block), opts)
end

#subscribe_with(consumer, opts = {}) ⇒ Object


182
183
184
185
186
187
188
189
190
191
# File 'lib/march_hare/queue.rb', line 182

def subscribe_with(consumer, opts = {})
  @consumer_tag = @channel.basic_consume(@name, !(opts[:ack] || opts[:manual_ack]), opts[:consumer_tag], consumer)
  consumer.consumer_tag = @consumer_tag

  @default_consumer = consumer
  @channel.register_consumer(@consumer_tag, consumer)

  consumer.start
  consumer
end

#unbind(exchange, options = {}) ⇒ Object

Unbinds queue from an exchange

Options Hash (options):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:


115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/march_hare/queue.rb', line 115

def unbind(exchange, options={})
  exchange_name = if exchange.respond_to?(:name) then
                    exchange.name
                  else
                    exchange.to_s
                  end
  @channel.queue_unbind(@name, exchange_name, options.fetch(:routing_key, ''))

  binding = { :exchange => exchange_name, :routing_key => (options[:routing_key] || options[:key] || ""), :arguments => options[:arguments] }
  @bindings.delete(binding) unless @bindings.include?(binding)

  self
end