Class: Agni::Queue

Inherits:
Object
  • Object
show all
Includes:
LogMixin
Defined in:
lib/agni/queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, messenger, options = {}) ⇒ Queue

Core method responsible for catching queue name problems, like nil values and empty strings.

Parameters:

  • queue_name (String)

    the name of this queue

  • messenger (Agni::Messenger)

    the messenger object with which this queue is associated

  • options (Hash) (defaults to: {})

    options that will be passed to the AMQP gem during queue creation



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/agni/queue.rb', line 13

def initialize(queue_name, messenger, options={})
  if queue_name.nil? || queue_name.empty?
    raise ArgumentError, 'Queue name must be present when creating a queue'
  end
  self.configure_logs
  @logical_queue_name = queue_name

  begin
    @queues = PRIORITY_LEVELS.map do |priority|
      create_queue(messenger, priority, options)
    end
  rescue AMQP::IncompatibleOptionsError
    raise AgniError,
    "One of the queues needed to create #{@logical_queue_name} " +
      "has already been created with different options!"
  end

  # The in-memory queue we use to prioritize incoming messages of
  # different priorities
  @queue_mutex = Mutex.new
  @memory_queue = Containers::MinHeap.new
end

Instance Method Details

#create_queue_name(base_name, priority) ⇒ Object

Given a base name and a priority, creates a queue name suitable for use in naming an underlying AMQP queue.

Parameters:

  • base_name (String)

    the base name of the queue. This is typcially just the queue name used when creating this Agni::Queue object.

  • priority (String)

    valid priorities are in the range 0 through 9 inclusive.



103
104
105
# File 'lib/agni/queue.rb', line 103

def create_queue_name(base_name, priority)
  "#{base_name}.#{priority}"
end

#publish(payload, priority = DEFAULT_PRIORITY, options = {}) ⇒ Object

Publishes a payload to this queue.

Parameters:

  • payload (String)

    the payload of the message to publish

  • priority (FixNum) (defaults to: DEFAULT_PRIORITY)

    must be one between 0 and 9, inclusive.

  • options (Hash) (defaults to: {})


86
87
88
89
90
91
92
93
# File 'lib/agni/queue.rb', line 86

def publish(payload, priority=DEFAULT_PRIORITY, options={})
  unless PRIORITY_LEVELS.include? priority
    raise ArgumentError, "Invalid priority #{priority}, must be between 0 and 9"
  end
  queue_name = create_queue_name(@logical_queue_name, priority)
  @queues[priority][:exchange].publish(payload, DEFAULT_MESSAGE_OPTS.merge(options).
                                       merge(:routing_key => queue_name))
end

#subscribe(handler, options = {}) ⇒ Object

Subscribes to this queue, handling each incoming message with the provided handler.

Parameters:

  • handler (Proc)

    accepts two arguments: metadata [Hash] a hash of attributes as it is provided by the underlying AMQP implementation. payload [String] the message itself, as was provided by the publisher The return value from the handler will be discarded.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/agni/queue.rb', line 44

def subscribe(handler, options={})
  if subscribed?
    raise AgniError, 'Queue #{queue_name} is already subscribed'
  end
  ack = options[:ack].nil? ? true : options[:ack]
  handle_func = lambda do
    , payload = pop
    handler[, payload] if handler
    EventMachine.next_tick{ .ack } if ack
  end
  @queues.each do |q|
    queue = q[:queue]
    priority = q[:priority]
    queue.subscribe(:ack => true) do |, payload|
      @queue_mutex.synchronize do
        @memory_queue.push(priority, [, payload])
      end
      EventMachine.next_tick { EventMachine.defer(handle_func) }
    end
  end
  self
end

#subscribed?True

Returns iff every AMQP queue is subscribed?.

Returns:

  • (True)

    iff every AMQP queue is subscribed?



77
78
79
# File 'lib/agni/queue.rb', line 77

def subscribed?
  @queues.map{|q| q[:queue].default_consumer}.all?{|c| c.subscribed? if c}
end

#unsubscribeObject



67
68
69
70
71
72
73
74
# File 'lib/agni/queue.rb', line 67

def unsubscribe
  unless subscribed?
    raise AgniError, 'Queue #{queue_name} is not subscribed'
  end
  @queues.each do |q|
    q[:queue].unsubscribe
  end
end