Class: Qrack::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/ext/bunny-0.6.0/lib/qrack/subscription.rb

Overview

Subscription ancestor class

Direct Known Subclasses

Bunny::Subscription, Bunny::Subscription09

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, queue, opts = {}) ⇒ Subscription

Returns a new instance of Subscription.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 8

def initialize(client, queue, opts = {})
	@client = client
	@queue = queue

	# Get timeout value
	@timeout = opts[:timeout] || nil

	# Get maximum amount of messages to process
	@message_max = opts[:message_max] || nil

	# If a consumer tag is not passed in the server will generate one
	@consumer_tag = opts[:consumer_tag] || nil

	# Ignore the :nowait option if passed, otherwise program will hang waiting for a
	# response from the server causing an error.
	opts.delete(:nowait)

	# Do we want to have to provide an acknowledgement?
	@ack = opts[:ack] || nil
	
	# Does this consumer want exclusive use of the queue?
	@exclusive = opts[:exclusive] || false

	# Initialize message counter
	@message_count = 0
	
	# Give queue reference to this subscription
	@queue.subscription = self
	
	# Store options
	@opts = opts

end

Instance Attribute Details

#ackObject

Returns the value of attribute ack.



5
6
7
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 5

def ack
  @ack
end

#clientObject (readonly)

Returns the value of attribute client.



6
7
8
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 6

def client
  @client
end

#consumer_tagObject

Returns the value of attribute consumer_tag.



5
6
7
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 5

def consumer_tag
  @consumer_tag
end

#delivery_tagObject

Returns the value of attribute delivery_tag.



5
6
7
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 5

def delivery_tag
  @delivery_tag
end

#exclusiveObject

Returns the value of attribute exclusive.



5
6
7
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 5

def exclusive
  @exclusive
end

#message_countObject (readonly)

Returns the value of attribute message_count.



6
7
8
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 6

def message_count
  @message_count
end

#message_maxObject

Returns the value of attribute message_max.



5
6
7
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 5

def message_max
  @message_max
end

#queueObject (readonly)

Returns the value of attribute queue.



6
7
8
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 6

def queue
  @queue
end

#timeoutObject

Returns the value of attribute timeout.



5
6
7
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 5

def timeout
  @timeout
end

Instance Method Details

#start(&blk) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/ext/bunny-0.6.0/lib/qrack/subscription.rb', line 42

def start(&blk)
	
	# Do not process any messages if zero message_max
	if message_max == 0
		return
	end
	
	# Notify server about new consumer
	setup_consumer

	# Start subscription loop
	loop do
	
		begin
			method = client.next_method(:timeout => timeout)
		rescue Qrack::ClientTimeout
			queue.unsubscribe()
			break
		end
		
		# Increment message counter
		@message_count += 1

		# get delivery tag to use for acknowledge
		queue.delivery_tag = method.delivery_tag if @ack

		header = client.next_payload

	  # If maximum frame size is smaller than message payload body then message
		# will have a message header and several message bodies				
	  msg = ''
		while msg.length < header.size
			msg += client.next_payload
		end

		# If block present, pass the message info to the block for processing		
		blk.call({:header => header, :payload => msg, :delivery_details => method.arguments}) if !blk.nil?

		# Exit loop if message_max condition met
		if (!message_max.nil? and message_count == message_max)
			# Stop consuming messages
			queue.unsubscribe()				
			# Acknowledge receipt of the final message
			queue.ack() if @ack
			# Quit the loop
			break
		end
	
		# Have to do the ack here because the ack triggers the release of messages from the server
		# if you are using Client#qos prefetch and you will get extra messages sent through before
		# the unsubscribe takes effect to stop messages being sent to this consumer unless the ack is
		# deferred.
		queue.ack() if @ack

	end

end