Class: Qrack::Subscription Deprecated
- Inherits:
-
Object
- Object
- Qrack::Subscription
- Defined in:
- lib/qrack/subscription.rb
Overview
Subscription ancestor class
Direct Known Subclasses
Instance Attribute Summary collapse
-
#ack ⇒ Object
Returns the value of attribute ack.
-
#break_when_empty ⇒ Object
Returns the value of attribute break_when_empty.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#consumer_tag ⇒ Object
Returns the value of attribute consumer_tag.
-
#delivery_tag ⇒ Object
Returns the value of attribute delivery_tag.
-
#exclusive ⇒ Object
Returns the value of attribute exclusive.
-
#message_count ⇒ Object
readonly
Returns the value of attribute message_count.
-
#message_max ⇒ Object
Returns the value of attribute message_max.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
Instance Method Summary collapse
-
#initialize(client, queue, opts = {}) ⇒ Subscription
constructor
A new instance of Subscription.
- #start(&blk) ⇒ Object
Constructor Details
#initialize(client, queue, opts = {}) ⇒ Subscription
Returns a new instance of Subscription.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/qrack/subscription.rb', line 16 def initialize(client, queue, opts = {}) @client = client @queue = queue # Get timeout value @timeout = opts[:timeout] || nil # Get maximum amount of messages to process @message_max = opts[:message_max] || nil # If a consumer tag is not passed in the server will generate one @consumer_tag = opts[:consumer_tag] || nil # In a subscription loop, stop listening if there are no more messages in the queue @break_when_empty = opts[:break_when_empty] || nil # Ignore the :nowait option if passed, otherwise program will hang waiting for a # response from the server causing an error. opts.delete(:nowait) # Do we want to have to provide an acknowledgement? @ack = opts[:ack] || nil # Does this consumer want exclusive use of the queue? @exclusive = opts[:exclusive] || false # Initialize message counter @message_count = 0 # Store options @opts = opts end |
Instance Attribute Details
#ack ⇒ Object
Returns the value of attribute ack.
13 14 15 |
# File 'lib/qrack/subscription.rb', line 13 def ack @ack end |
#break_when_empty ⇒ Object
Returns the value of attribute break_when_empty.
13 14 15 |
# File 'lib/qrack/subscription.rb', line 13 def break_when_empty @break_when_empty end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
14 15 16 |
# File 'lib/qrack/subscription.rb', line 14 def client @client end |
#consumer_tag ⇒ Object
Returns the value of attribute consumer_tag.
13 14 15 |
# File 'lib/qrack/subscription.rb', line 13 def consumer_tag @consumer_tag end |
#delivery_tag ⇒ Object
Returns the value of attribute delivery_tag.
13 14 15 |
# File 'lib/qrack/subscription.rb', line 13 def delivery_tag @delivery_tag end |
#exclusive ⇒ Object
Returns the value of attribute exclusive.
13 14 15 |
# File 'lib/qrack/subscription.rb', line 13 def exclusive @exclusive end |
#message_count ⇒ Object (readonly)
Returns the value of attribute message_count.
14 15 16 |
# File 'lib/qrack/subscription.rb', line 14 def @message_count end |
#message_max ⇒ Object
Returns the value of attribute message_max.
13 14 15 |
# File 'lib/qrack/subscription.rb', line 13 def @message_max end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
14 15 16 |
# File 'lib/qrack/subscription.rb', line 14 def queue @queue end |
#timeout ⇒ Object
Returns the value of attribute timeout.
13 14 15 |
# File 'lib/qrack/subscription.rb', line 13 def timeout @timeout end |
Instance Method Details
#start(&blk) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/qrack/subscription.rb', line 49 def start(&blk) # Do not process any messages if zero message_max if == 0 return end # Notify server about new consumer setup_consumer # Start subscription loop loop do begin method = client.next_method(:timeout => timeout) rescue Qrack::FrameTimeout # Stop consuming messages queue.unsubscribe() break end # Increment message counter @message_count += 1 # get delivery tag to use for acknowledge queue.delivery_tag = method.delivery_tag if @ack header = client.next_payload # If maximum frame size is smaller than message payload body then message # will have a message header and several message bodies msg = '' while msg.length < header.size msg << client.next_payload end # If block present, pass the message info to the block for processing msg = {:header => header, :payload => msg, :delivery_details => method.arguments, :subscribed => true} blk.call(msg) unless blk.nil? # Exit loop if message_max condition met if (!.nil? and == ) # Stop consuming messages queue.unsubscribe() queue.ack() if @ack break elsif(!msg[:subscribed]) queue.unsubscribe() queue.ack() if @ack break elsif(@break_when_empty && queue. == 0) queue.unsubscribe() queue.ack() if @ack break end # Have to do the ack here because the ack triggers the release of messages from the server # if you are using Client#qos prefetch and you will get extra messages sent through before # the unsubscribe takes effect to stop messages being sent to this consumer unless the ack is # deferred. queue.ack() if @ack end end |