Class: Agni::Queue
Instance Method Summary collapse
-
#create_queue_name(base_name, priority) ⇒ Object
Given a base name and a priority, creates a queue name suitable for use in naming an underlying AMQP queue.
-
#initialize(queue_name, messenger, options = {}) ⇒ Queue
constructor
Core method responsible for catching queue name problems, like nil values and empty strings.
-
#publish(payload, priority = DEFAULT_PRIORITY, options = {}) ⇒ Object
Publishes a payload to this queue.
-
#subscribe(handler, options = {}) ⇒ Object
Subscribes to this queue, handling each incoming message with the provided
handler. -
#subscribed? ⇒ True
Iff every AMQP queue is
subscribed?. - #unsubscribe ⇒ Object
Constructor Details
#initialize(queue_name, messenger, options = {}) ⇒ Queue
Core method responsible for catching queue name problems, like nil values and empty strings.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/agni/queue.rb', line 13 def initialize(queue_name, messenger, ={}) if queue_name.nil? || queue_name.empty? raise ArgumentError, 'Queue name must be present when creating a queue' end self.configure_logs @logical_queue_name = queue_name begin @queues = PRIORITY_LEVELS.map do |priority| create_queue(messenger, priority, ) end rescue AMQP::IncompatibleOptionsError raise AgniError, "One of the queues needed to create #{@logical_queue_name} " + "has already been created with different options!" end # The in-memory queue we use to prioritize incoming messages of # different priorities @queue_mutex = Mutex.new @memory_queue = Containers::MinHeap.new end |
Instance Method Details
#create_queue_name(base_name, priority) ⇒ Object
Given a base name and a priority, creates a queue name suitable for use in naming an underlying AMQP queue.
103 104 105 |
# File 'lib/agni/queue.rb', line 103 def create_queue_name(base_name, priority) "#{base_name}.#{priority}" end |
#publish(payload, priority = DEFAULT_PRIORITY, options = {}) ⇒ Object
Publishes a payload to this queue.
86 87 88 89 90 91 92 93 |
# File 'lib/agni/queue.rb', line 86 def publish(payload, priority=DEFAULT_PRIORITY, ={}) unless PRIORITY_LEVELS.include? priority raise ArgumentError, "Invalid priority #{priority}, must be between 0 and 9" end queue_name = create_queue_name(@logical_queue_name, priority) @queues[priority][:exchange].publish(payload, DEFAULT_MESSAGE_OPTS.merge(). merge(:routing_key => queue_name)) end |
#subscribe(handler, options = {}) ⇒ Object
Subscribes to this queue, handling each incoming message with the provided handler.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/agni/queue.rb', line 44 def subscribe(handler, ={}) if subscribed? raise AgniError, 'Queue #{queue_name} is already subscribed' end ack = [:ack].nil? ? true : [:ack] handle_func = lambda do , payload = pop handler[, payload] if handler EventMachine.next_tick{ .ack } if ack end @queues.each do |q| queue = q[:queue] priority = q[:priority] queue.subscribe(:ack => true) do |, payload| @queue_mutex.synchronize do @memory_queue.push(priority, [, payload]) end EventMachine.next_tick { EventMachine.defer(handle_func) } end end self end |
#subscribed? ⇒ True
Returns iff every AMQP queue is subscribed?.
77 78 79 |
# File 'lib/agni/queue.rb', line 77 def subscribed? @queues.map{|q| q[:queue].default_consumer}.all?{|c| c.subscribed? if c} end |
#unsubscribe ⇒ Object
67 68 69 70 71 72 73 74 |
# File 'lib/agni/queue.rb', line 67 def unsubscribe unless subscribed? raise AgniError, 'Queue #{queue_name} is not subscribed' end @queues.each do |q| q[:queue].unsubscribe end end |