Class: Qrack::Subscription Deprecated

Inherits:
Object
  • Object
show all
Defined in:
lib/qrack/subscription.rb

Overview

Deprecated.

Subscription ancestor class

Direct Known Subclasses

Bunny::Consumer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, queue, opts = {}) ⇒ Subscription

Returns a new instance of Subscription.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/qrack/subscription.rb', line 16

def initialize(client, queue, opts = {})
  @client = client
  @queue = queue

  # Get timeout value
  @timeout = opts[:timeout] || nil

  # Get maximum amount of messages to process
  @message_max = opts[:message_max] || nil

  # If a consumer tag is not passed in the server will generate one
  @consumer_tag = opts[:consumer_tag] || nil
  
  # In a subscription loop, stop listening if there are no more messages in the queue
  @break_when_empty = opts[:break_when_empty] || nil

  # Ignore the :nowait option if passed, otherwise program will hang waiting for a
  # response from the server causing an error.
  opts.delete(:nowait)

  # Do we want to have to provide an acknowledgement?
  @ack = opts[:ack] || nil

  # Does this consumer want exclusive use of the queue?
  @exclusive = opts[:exclusive] || false

  # Initialize message counter
  @message_count = 0

  # Store options
  @opts = opts
end

Instance Attribute Details

#ackObject

Returns the value of attribute ack.



13
14
15
# File 'lib/qrack/subscription.rb', line 13

def ack
  @ack
end

#break_when_emptyObject

Returns the value of attribute break_when_empty.



13
14
15
# File 'lib/qrack/subscription.rb', line 13

def break_when_empty
  @break_when_empty
end

#clientObject (readonly)

Returns the value of attribute client.



14
15
16
# File 'lib/qrack/subscription.rb', line 14

def client
  @client
end

#consumer_tagObject

Returns the value of attribute consumer_tag.



13
14
15
# File 'lib/qrack/subscription.rb', line 13

def consumer_tag
  @consumer_tag
end

#delivery_tagObject

Returns the value of attribute delivery_tag.



13
14
15
# File 'lib/qrack/subscription.rb', line 13

def delivery_tag
  @delivery_tag
end

#exclusiveObject

Returns the value of attribute exclusive.



13
14
15
# File 'lib/qrack/subscription.rb', line 13

def exclusive
  @exclusive
end

#message_countObject (readonly)

Returns the value of attribute message_count.



14
15
16
# File 'lib/qrack/subscription.rb', line 14

def message_count
  @message_count
end

#message_maxObject

Returns the value of attribute message_max.



13
14
15
# File 'lib/qrack/subscription.rb', line 13

def message_max
  @message_max
end

#queueObject (readonly)

Returns the value of attribute queue.



14
15
16
# File 'lib/qrack/subscription.rb', line 14

def queue
  @queue
end

#timeoutObject

Returns the value of attribute timeout.



13
14
15
# File 'lib/qrack/subscription.rb', line 13

def timeout
  @timeout
end

Instance Method Details

#start(&blk) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/qrack/subscription.rb', line 49

def start(&blk)
  # Do not process any messages if zero message_max
  if message_max == 0
    return
  end

  # Notify server about new consumer
  setup_consumer

  # Start subscription loop
  loop do

    begin
      method = client.next_method(:timeout => timeout)
    rescue Qrack::FrameTimeout
      # Stop consuming messages
      queue.unsubscribe()
      break
    end

    # Increment message counter
    @message_count += 1

    # get delivery tag to use for acknowledge
    queue.delivery_tag = method.delivery_tag if @ack

    header = client.next_payload

    # If maximum frame size is smaller than message payload body then message
    # will have a message header and several message bodies
    msg = ''
    while msg.length < header.size
      msg << client.next_payload
    end

    # If block present, pass the message info to the block for processing
    msg = {:header => header, :payload => msg, :delivery_details => method.arguments, :subscribed => true}
    blk.call(msg) unless blk.nil?

    # Exit loop if message_max condition met
    if (!message_max.nil? and message_count == message_max)
      # Stop consuming messages
      queue.unsubscribe()
      queue.ack() if @ack
      break
    elsif(!msg[:subscribed])
      queue.unsubscribe()
      queue.ack() if @ack
      break
    elsif(@break_when_empty && queue.message_count == 0)
      queue.unsubscribe()
      queue.ack() if @ack
      break
    end

    # Have to do the ack here because the ack triggers the release of messages from the server
    # if you are using Client#qos prefetch and you will get extra messages sent through before
    # the unsubscribe takes effect to stop messages being sent to this consumer unless the ack is
    # deferred.
    queue.ack() if @ack
  end
end