Class: Droid::BaseQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/droid/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, opts = {}) ⇒ BaseQueue

Returns a new instance of BaseQueue.



8
9
10
11
12
# File 'lib/droid/queue.rb', line 8

def initialize(queue_name, opts={})
	opts[:auto_delete] = true unless opts.has_key?(:auto_delete) and opts[:auto_delete] === false

	@queue_name, @opts = queue_name, opts
end

Instance Attribute Details

#exObject (readonly)

Returns the value of attribute ex.



6
7
8
# File 'lib/droid/queue.rb', line 6

def ex
  @ex
end

#mqObject (readonly)

Returns the value of attribute mq.



6
7
8
# File 'lib/droid/queue.rb', line 6

def mq
  @mq
end

#optsObject (readonly)

Returns the value of attribute opts.



5
6
7
# File 'lib/droid/queue.rb', line 5

def opts
  @opts
end

#qObject (readonly)

Returns the value of attribute q.



6
7
8
# File 'lib/droid/queue.rb', line 6

def q
  @q
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



5
6
7
# File 'lib/droid/queue.rb', line 5

def queue_name
  @queue_name
end

Instance Method Details

#destroyObject



66
67
68
# File 'lib/droid/queue.rb', line 66

def destroy
	teardown
end

#logObject



25
26
27
# File 'lib/droid/queue.rb', line 25

def log
	Droid.log
end

#setupObject



14
15
16
17
18
19
# File 'lib/droid/queue.rb', line 14

def setup
	@mq = MQ.new
	@q = @mq.queue(queue_name, opts)
	# if we don't specify an exchange name it defaults to the queue_name
	@ex = @mq.direct(opts[:exchange_name] || queue_name)
end

#subscribe(amqp_opts = {}, opts = {}) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/droid/queue.rb', line 34

def subscribe(amqp_opts={}, opts={})
	setup

	q.bind(ex) if ex
	q.subscribe(amqp_opts) do |header, message|
		Droid::Utilization.monitor(q.name, :temp => temp?) do
			request = Droid::Request.new(self, header, message)
			log.info "amqp_message #{tag} action=received ttl=#{request.ttl} age=#{request.age} #{request.data_summary}"
			begin
				raise Droid::ExpiredMessage if request.expired?
				yield request if block_given?
				finished = Time.now.getgm.to_i
				log.info "amqp_message action=processed #{tag} elapsed=#{finished-request.start} ttl=#{request.ttl} age=#{request.age} #{request.data_summary}"
			rescue Droid::ExpiredMessage
				log.info "amqp_message action=timeout #{tag} ttl=#{request.ttl} age=#{request.age} #{request.data_summary}"
				request.ack if amqp_opts[:ack]
			rescue => e
				request.ack if amqp_opts[:ack]
				Droid.handle_error(e)
			end
		end
	end
	log.info "amqp_subscribe #{tag}"
	self
end

#tagObject



29
30
31
32
# File 'lib/droid/queue.rb', line 29

def tag
	s = "queue=#{q.name}"
	s += " exchange=#{ex.name}" if ex
end

#teardownObject



60
61
62
63
64
# File 'lib/droid/queue.rb', line 60

def teardown
	@q.unsubscribe
	@mq.close
	log.info "amqp_unsubscribe #{tag}"
end

#temp?Boolean

Returns:

  • (Boolean)


21
22
23
# File 'lib/droid/queue.rb', line 21

def temp?
	false
end