Class: Droid::BaseQueue
- Inherits:
-
Object
- Object
- Droid::BaseQueue
- Defined in:
- lib/droid/queue.rb
Direct Known Subclasses
BackwardsCompatibleQueue, ListenQueue, ReplyQueue, WorkerQueue
Instance Attribute Summary collapse
-
#ex ⇒ Object
readonly
Returns the value of attribute ex.
-
#mq ⇒ Object
readonly
Returns the value of attribute mq.
-
#opts ⇒ Object
readonly
Returns the value of attribute opts.
-
#q ⇒ Object
readonly
Returns the value of attribute q.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
Instance Method Summary collapse
- #destroy ⇒ Object
-
#initialize(queue_name, opts = {}) ⇒ BaseQueue
constructor
A new instance of BaseQueue.
- #log ⇒ Object
- #setup ⇒ Object
- #subscribe(amqp_opts = {}, opts = {}) ⇒ Object
- #tag ⇒ Object
- #teardown ⇒ Object
- #temp? ⇒ Boolean
Constructor Details
#initialize(queue_name, opts = {}) ⇒ BaseQueue
Returns a new instance of BaseQueue.
8 9 10 11 12 |
# File 'lib/droid/queue.rb', line 8 def initialize(queue_name, opts={}) opts[:auto_delete] = true unless opts.has_key?(:auto_delete) and opts[:auto_delete] === false @queue_name, @opts = queue_name, opts end |
Instance Attribute Details
#ex ⇒ Object (readonly)
Returns the value of attribute ex.
6 7 8 |
# File 'lib/droid/queue.rb', line 6 def ex @ex end |
#mq ⇒ Object (readonly)
Returns the value of attribute mq.
6 7 8 |
# File 'lib/droid/queue.rb', line 6 def mq @mq end |
#opts ⇒ Object (readonly)
Returns the value of attribute opts.
5 6 7 |
# File 'lib/droid/queue.rb', line 5 def opts @opts end |
#q ⇒ Object (readonly)
Returns the value of attribute q.
6 7 8 |
# File 'lib/droid/queue.rb', line 6 def q @q end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
5 6 7 |
# File 'lib/droid/queue.rb', line 5 def queue_name @queue_name end |
Instance Method Details
#destroy ⇒ Object
67 68 69 |
# File 'lib/droid/queue.rb', line 67 def destroy teardown end |
#setup ⇒ Object
14 15 16 17 18 19 |
# File 'lib/droid/queue.rb', line 14 def setup @mq = MQ.new @q = @mq.queue(queue_name, opts) # if we don't specify an exchange name it defaults to the queue_name @ex = @mq.direct(opts[:exchange_name] || queue_name) end |
#subscribe(amqp_opts = {}, opts = {}) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/droid/queue.rb', line 35 def subscribe(amqp_opts={}, opts={}) setup q.bind(ex) if ex q.subscribe(amqp_opts) do |header, | Droid::Utilization.monitor(q.name, :temp => temp?) do request = Droid::Request.new(self, header, ) log.info "amqp_message #{tag} action=received ttl=#{request.ttl} age=#{request.age} #{request.data_summary}" begin raise Droid::ExpiredMessage if request.expired? yield request if block_given? finished = Time.now.getgm.to_i log.info "amqp_message action=processed #{tag} elapsed=#{finished-request.start} ttl=#{request.ttl} age=#{request.age} #{request.data_summary}" rescue Droid::ExpiredMessage log.info "amqp_message action=timeout #{tag} ttl=#{request.ttl} age=#{request.age} #{request.data_summary}" request.ack if amqp_opts[:ack] rescue => e request.ack if amqp_opts[:ack] Droid.handle_error(e) end end end log.info "amqp_subscribe #{tag}" self end |
#tag ⇒ Object
29 30 31 32 33 |
# File 'lib/droid/queue.rb', line 29 def tag s = "queue=#{q.name}" s += " exchange=#{ex.name}" if ex s end |
#teardown ⇒ Object
61 62 63 64 65 |
# File 'lib/droid/queue.rb', line 61 def teardown @q.unsubscribe @mq.close log.info "amqp_unsubscribe #{tag}" end |
#temp? ⇒ Boolean
21 22 23 |
# File 'lib/droid/queue.rb', line 21 def temp? false end |