Class: Droid::BaseQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/droid/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, opts = {}) ⇒ BaseQueue

Returns a new instance of BaseQueue.



8
9
10
11
12
# File 'lib/droid/queue.rb', line 8

def initialize(queue_name, opts={})
  opts[:auto_delete] = true unless opts.has_key?(:auto_delete) and opts[:auto_delete] === false

  @queue_name, @opts = queue_name, opts
end

Instance Attribute Details

#exObject (readonly)

Returns the value of attribute ex.



6
7
8
# File 'lib/droid/queue.rb', line 6

def ex
  @ex
end

#mqObject (readonly)

Returns the value of attribute mq.



6
7
8
# File 'lib/droid/queue.rb', line 6

def mq
  @mq
end

#optsObject (readonly)

Returns the value of attribute opts.



5
6
7
# File 'lib/droid/queue.rb', line 5

def opts
  @opts
end

#qObject (readonly)

Returns the value of attribute q.



6
7
8
# File 'lib/droid/queue.rb', line 6

def q
  @q
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



5
6
7
# File 'lib/droid/queue.rb', line 5

def queue_name
  @queue_name
end

Instance Method Details

#destroyObject



67
68
69
# File 'lib/droid/queue.rb', line 67

def destroy
  teardown
end

#logObject



25
26
27
# File 'lib/droid/queue.rb', line 25

def log
  Droid.log
end

#setupObject



14
15
16
17
18
19
# File 'lib/droid/queue.rb', line 14

def setup
  @mq = MQ.new
  @q = @mq.queue(queue_name, opts)
  # if we don't specify an exchange name it defaults to the queue_name
  @ex = @mq.direct(opts[:exchange_name] || queue_name)
end

#subscribe(amqp_opts = {}, opts = {}) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/droid/queue.rb', line 35

def subscribe(amqp_opts={}, opts={})
  setup

  q.bind(ex) if ex
  q.subscribe(amqp_opts) do |header, message|
    Droid::Utilization.monitor(q.name, :temp => temp?) do
      request = Droid::Request.new(self, header, message)
      log.info "amqp_message #{tag} action=received ttl=#{request.ttl} age=#{request.age} #{request.data_summary}"
      begin
        raise Droid::ExpiredMessage if request.expired?
        yield request if block_given?
        finished = Time.now.getgm.to_i
        log.info "amqp_message action=processed #{tag} elapsed=#{finished-request.start} ttl=#{request.ttl} age=#{request.age} #{request.data_summary}"
      rescue Droid::ExpiredMessage
        log.info "amqp_message action=timeout #{tag} ttl=#{request.ttl} age=#{request.age} #{request.data_summary}"
        request.ack if amqp_opts[:ack]
      rescue => e
        request.ack if amqp_opts[:ack]
        Droid.handle_error(e)
      end
    end
  end
  log.info "amqp_subscribe #{tag}"
  self
end

#tagObject



29
30
31
32
33
# File 'lib/droid/queue.rb', line 29

def tag
  s = "queue=#{q.name}"
  s += " exchange=#{ex.name}" if ex
  s
end

#teardownObject



61
62
63
64
65
# File 'lib/droid/queue.rb', line 61

def teardown
  @q.unsubscribe
  @mq.close
  log.info "amqp_unsubscribe #{tag}"
end

#temp?Boolean

Returns:

  • (Boolean)


21
22
23
# File 'lib/droid/queue.rb', line 21

def temp?
  false
end