Class: Bunny::Subscription09

Inherits:
Consumer show all
Defined in:
lib/bunny/subscription09.rb

Overview

Asks the server to start a “consumer”, which is a transient request for messages from a specific queue. Consumers last as long as the channel they were created on, or until the client cancels them with an unsubscribe. Every time a message reaches the queue it is passed to the blk for processing. If error occurs, ProtocolError is raised.

Operation

Passes a hash of message information to the block, if one has been supplied. The hash contains :header, :payload and :delivery_details. The structure of the data is as follows -

:header has instance variables - @klass @size @weight @properties is a hash containing - :content_type :delivery_mode :priority

:payload contains the message contents

:delivery details is a hash containing - :consumer_tag :delivery_tag :redelivered :exchange :routing_key

If the :timeout option is specified then the subscription will automatically cease if the given number of seconds passes with no message arriving.

Examples:

my_queue.subscribe(timeout: 5) { |msg| puts msg[:payload] }
my_queue.subscribe(message_max: 10, ack: true) { |msg| puts msg[:payload] }

Instance Attribute Summary

Attributes inherited from Qrack::Subscription

#ack, #break_when_empty, #client, #consumer_tag, #delivery_tag, #exclusive, #message_count, #message_max, #queue, #timeout

Instance Method Summary collapse

Methods inherited from Consumer

#initialize

Methods inherited from Qrack::Subscription

#initialize, #start

Constructor Details

This class inherits a constructor from Bunny::Consumer

Instance Method Details

#setup_consumerObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/bunny/subscription09.rb', line 66

def setup_consumer
  subscription_options = {
    :deprecated_ticket => 0,
    :queue => queue.name,
    :consumer_tag => consumer_tag,
    :no_ack => !ack,
    :exclusive => exclusive,
    :nowait => false
  }.merge(@opts)

  client.send_frame(Qrack::Protocol09::Basic::Consume.new(subscription_options))

  method = client.next_method

  client.check_response(method, Qrack::Protocol09::Basic::ConsumeOk, "Error subscribing to queue #{queue.name}")

  @consumer_tag = method.consumer_tag
end