Class: Qrack::Subscription
- Inherits:
-
Object
- Object
- Qrack::Subscription
- Defined in:
- lib/ext/bunny-0.6.0/lib/qrack/subscription.rb
Overview
Subscription ancestor class
Direct Known Subclasses
Instance Attribute Summary collapse
-
#ack ⇒ Object
Returns the value of attribute ack.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#consumer_tag ⇒ Object
Returns the value of attribute consumer_tag.
-
#delivery_tag ⇒ Object
Returns the value of attribute delivery_tag.
-
#exclusive ⇒ Object
Returns the value of attribute exclusive.
-
#message_count ⇒ Object
readonly
Returns the value of attribute message_count.
-
#message_max ⇒ Object
Returns the value of attribute message_max.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
Instance Method Summary collapse
-
#initialize(client, queue, opts = {}) ⇒ Subscription
constructor
A new instance of Subscription.
- #start(&blk) ⇒ Object
Constructor Details
#initialize(client, queue, opts = {}) ⇒ Subscription
Returns a new instance of Subscription.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 8 def initialize(client, queue, opts = {}) @client = client @queue = queue # Get timeout value @timeout = opts[:timeout] || nil # Get maximum amount of messages to process @message_max = opts[:message_max] || nil # If a consumer tag is not passed in the server will generate one @consumer_tag = opts[:consumer_tag] || nil # Ignore the :nowait option if passed, otherwise program will hang waiting for a # response from the server causing an error. opts.delete(:nowait) # Do we want to have to provide an acknowledgement? @ack = opts[:ack] || nil # Does this consumer want exclusive use of the queue? @exclusive = opts[:exclusive] || false # Initialize message counter @message_count = 0 # Give queue reference to this subscription @queue.subscription = self # Store options @opts = opts end |
Instance Attribute Details
#ack ⇒ Object
Returns the value of attribute ack.
5 6 7 |
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 5 def ack @ack end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
6 7 8 |
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 6 def client @client end |
#consumer_tag ⇒ Object
Returns the value of attribute consumer_tag.
5 6 7 |
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 5 def consumer_tag @consumer_tag end |
#delivery_tag ⇒ Object
Returns the value of attribute delivery_tag.
5 6 7 |
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 5 def delivery_tag @delivery_tag end |
#exclusive ⇒ Object
Returns the value of attribute exclusive.
5 6 7 |
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 5 def exclusive @exclusive end |
#message_count ⇒ Object (readonly)
Returns the value of attribute message_count.
6 7 8 |
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 6 def @message_count end |
#message_max ⇒ Object
Returns the value of attribute message_max.
5 6 7 |
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 5 def @message_max end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
6 7 8 |
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 6 def queue @queue end |
#timeout ⇒ Object
Returns the value of attribute timeout.
5 6 7 |
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 5 def timeout @timeout end |
Instance Method Details
#start(&blk) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 42 def start(&blk) # Do not process any messages if zero message_max if == 0 return end # Notify server about new consumer setup_consumer # Start subscription loop loop do begin method = client.next_method(:timeout => timeout) rescue Qrack::ClientTimeout queue.unsubscribe() break end # Increment message counter @message_count += 1 # get delivery tag to use for acknowledge queue.delivery_tag = method.delivery_tag if @ack header = client.next_payload # If maximum frame size is smaller than message payload body then message # will have a message header and several message bodies msg = '' while msg.length < header.size msg += client.next_payload end # If block present, pass the message info to the block for processing blk.call({:header => header, :payload => msg, :delivery_details => method.arguments}) if !blk.nil? # Exit loop if message_max condition met if (!.nil? and == ) # Stop consuming messages queue.unsubscribe() # Acknowledge receipt of the final message queue.ack() if @ack # Quit the loop break end # Have to do the ack here because the ack triggers the release of messages from the server # if you are using Client#qos prefetch and you will get extra messages sent through before # the unsubscribe takes effect to stop messages being sent to this consumer unless the ack is # deferred. queue.ack() if @ack end end |