Class: Moqueue::MockQueue
- Inherits:
-
Object
- Object
- Moqueue::MockQueue
- Defined in:
- lib/moqueue/mock_queue.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
Instance Method Summary collapse
- #acked_messages ⇒ Object
- #bind(exchange, key = nil) ⇒ Object
- #callback_defined? ⇒ Boolean
-
#initialize(name) ⇒ MockQueue
constructor
A new instance of MockQueue.
-
#null_subscribe ⇒ Object
configures a do-nothing subscribe block to force received messages to be processed and stored in #received_messages.
- #prefetch(size) ⇒ Object
- #publish(message, opts = {}) ⇒ Object
- #receive(message, header_opts = {}) ⇒ Object
- #received_ack_for_message?(message_content) ⇒ Boolean
- #received_headers ⇒ Object
- #received_message?(message_content) ⇒ Boolean
- #received_messages ⇒ Object
- #received_messages_and_headers ⇒ Object
- #received_routing_key?(key) ⇒ Boolean
- #run_callback(*args) ⇒ Object
- #subscribe(opts = {}, &block) ⇒ Object
- #unsubscribe ⇒ Object
Constructor Details
#initialize(name) ⇒ MockQueue
Returns a new instance of MockQueue.
20 21 22 23 |
# File 'lib/moqueue/mock_queue.rb', line 20 def initialize(name) @name = name MockBroker.instance.register_queue(self) end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
7 8 9 |
# File 'lib/moqueue/mock_queue.rb', line 7 def name @name end |
Class Method Details
.new(name) ⇒ Object
11 12 13 14 15 16 |
# File 'lib/moqueue/mock_queue.rb', line 11 def new(name) if existing_queue = MockBroker.instance.find_queue(name) return existing_queue end super end |
Instance Method Details
#acked_messages ⇒ Object
91 92 93 94 95 |
# File 'lib/moqueue/mock_queue.rb', line 91 def .map do |r| r[:message] if @ack_msgs && r[:headers].received_ack? end end |
#bind(exchange, key = nil) ⇒ Object
74 75 76 77 |
# File 'lib/moqueue/mock_queue.rb', line 74 def bind(exchange, key=nil) exchange.attach_queue(self, key) self end |
#callback_defined? ⇒ Boolean
102 103 104 |
# File 'lib/moqueue/mock_queue.rb', line 102 def callback_defined? !! end |
#null_subscribe ⇒ Object
configures a do-nothing subscribe block to force received messages to be processed and stored in #received_messages
109 110 111 112 |
# File 'lib/moqueue/mock_queue.rb', line 109 def null_subscribe subscribe {|msg| nil} self end |
#prefetch(size) ⇒ Object
56 57 58 |
# File 'lib/moqueue/mock_queue.rb', line 56 def prefetch(size) # noop end |
#publish(message, opts = {}) ⇒ Object
64 65 66 67 68 69 70 71 72 |
# File 'lib/moqueue/mock_queue.rb', line 64 def publish(, opts = {}) if receive() else deferred_publishing_fibers << Fiber.new do receive() end end end |
#receive(message, header_opts = {}) ⇒ Object
34 35 36 37 38 39 40 41 42 |
# File 'lib/moqueue/mock_queue.rb', line 34 def receive(, header_opts={}) if callback = headers = MockHeaders.new(header_opts) callback.call(*(callback.arity == 1 ? [] : [headers, ])) << {:message => , :headers => headers} else (, header_opts) end end |
#received_ack_for_message?(message_content) ⇒ Boolean
60 61 62 |
# File 'lib/moqueue/mock_queue.rb', line 60 def () .include?() end |
#received_headers ⇒ Object
87 88 89 |
# File 'lib/moqueue/mock_queue.rb', line 87 def received_headers .map{ |r| r[:headers] } end |
#received_message?(message_content) ⇒ Boolean
44 45 46 |
# File 'lib/moqueue/mock_queue.rb', line 44 def () .include?() end |
#received_messages ⇒ Object
83 84 85 |
# File 'lib/moqueue/mock_queue.rb', line 83 def .map{|r| r[:message] } end |
#received_messages_and_headers ⇒ Object
79 80 81 |
# File 'lib/moqueue/mock_queue.rb', line 79 def @received_messages_and_headers ||= [] end |
#received_routing_key?(key) ⇒ Boolean
48 49 50 |
# File 'lib/moqueue/mock_queue.rb', line 48 def received_routing_key?(key) .find { |r| r[:headers] && r[:headers].properties[:routing_key] == key } end |
#run_callback(*args) ⇒ Object
97 98 99 100 |
# File 'lib/moqueue/mock_queue.rb', line 97 def run_callback(*args) callback = callback.call(*(callback.arity == 1 ? [args.first] : args)) end |
#subscribe(opts = {}, &block) ⇒ Object
25 26 27 28 29 30 31 32 |
# File 'lib/moqueue/mock_queue.rb', line 25 def subscribe(opts={}, &block) if @subscribe_block raise DoubleSubscribeError, "you can't subscribe to the same queue twice" end @subscribe_block = block @ack_msgs = opts[:ack] || false end |
#unsubscribe ⇒ Object
52 53 54 |
# File 'lib/moqueue/mock_queue.rb', line 52 def unsubscribe true end |