Class: PipelineToolkit::Cucumber::AMQP
- Inherits:
-
Object
- Object
- PipelineToolkit::Cucumber::AMQP
- Defined in:
- lib/pipeline_toolkit/cucumber/amqp.rb
Instance Method Summary collapse
- #close ⇒ Object
- #connected? ⇒ Boolean
-
#initialize ⇒ AMQP
constructor
A new instance of AMQP.
- #purge_queue(queue) ⇒ Object
- #queue_size(queue) ⇒ Object
- #receive_messages(queue, number, timeout = 5) ⇒ Object
- #send_messages(exchange, messages, exchange_type = :fanout) ⇒ Object
- #start(host, port) ⇒ Object
Constructor Details
#initialize ⇒ AMQP
Returns a new instance of AMQP.
8 9 |
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 8 def initialize end |
Instance Method Details
#close ⇒ Object
20 21 22 |
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 20 def close @mq.stop end |
#connected? ⇒ Boolean
16 17 18 |
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 16 def connected? @mq.connected? end |
#purge_queue(queue) ⇒ Object
24 25 26 |
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 24 def purge_queue(queue) @mq.queue(queue).purge end |
#queue_size(queue) ⇒ Object
28 29 30 |
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 28 def queue_size(queue) @mq.queue(queue).status[:message_count] end |
#receive_messages(queue, number, timeout = 5) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 41
def receive_messages(queue, number, timeout = 5)
received_msgs = []
Timeout::timeout(timeout) do
number.times do
message = @mq.queue(queue).pop
sleep(0.1)
retry if message[:payload] == :queue_empty
received_msgs << MessageCoder.decode(message[:payload])
end
end
received_msgs
rescue Timeout::Error
raise "Timed out waiting for AMQP messages. Received #{received_msgs.size}"
end
|
#send_messages(exchange, messages, exchange_type = :fanout) ⇒ Object
32 33 34 35 36 37 38 39 |
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 32 def (exchange, , exchange_type = :fanout) exchange = @mq.exchange(exchange, :passive => true, :type => exchange_type) .each do || opts = .has_key?('routing_key') ? {:key => ['routing_key']} : {} .default = nil exchange.publish(MessageCoder.encode(), opts) end end |
#start(host, port) ⇒ Object
11 12 13 14 |
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 11 def start(host, port) @mq = Bunny.new(:host => host, :port => port) @mq.start end |