Class: Moqueue::MockQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/moqueue/mock_queue.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ MockQueue

Returns a new instance of MockQueue.


20
21
22
23
# File 'lib/moqueue/mock_queue.rb', line 20

def initialize(name)
  @name = name
  MockBroker.instance.register_queue(self)
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.


7
8
9
# File 'lib/moqueue/mock_queue.rb', line 7

def name
  @name
end

Class Method Details

.new(name) ⇒ Object


11
12
13
14
15
16
# File 'lib/moqueue/mock_queue.rb', line 11

def new(name)
  if existing_queue = MockBroker.instance.find_queue(name)
    return existing_queue
  end
  super
end

Instance Method Details

#acked_messagesObject


91
92
93
94
95
# File 'lib/moqueue/mock_queue.rb', line 91

def acked_messages
  received_messages_and_headers.map do |r|
    r[:message] if @ack_msgs && r[:headers].received_ack?
  end
end

#bind(exchange, key = nil) ⇒ Object


74
75
76
77
# File 'lib/moqueue/mock_queue.rb', line 74

def bind(exchange, key=nil)
  exchange.attach_queue(self, key)
  self
end

#callback_defined?Boolean

Returns:

  • (Boolean)

102
103
104
# File 'lib/moqueue/mock_queue.rb', line 102

def callback_defined?
  !!message_handler_callback
end

#null_subscribeObject

configures a do-nothing subscribe block to force received messages to be processed and stored in #received_messages


109
110
111
112
# File 'lib/moqueue/mock_queue.rb', line 109

def null_subscribe
  subscribe {|msg| nil}
  self
end

#prefetch(size) ⇒ Object


56
57
58
# File 'lib/moqueue/mock_queue.rb', line 56

def prefetch(size)
  # noop
end

#publish(message, opts = {}) ⇒ Object


64
65
66
67
68
69
70
71
72
# File 'lib/moqueue/mock_queue.rb', line 64

def publish(message, opts = {})
  if message_handler_callback
    receive(message)
  else
    deferred_publishing_fibers << Fiber.new do
      receive(message)
    end
  end
end

#receive(message, header_opts = {}) ⇒ Object


34
35
36
37
38
39
40
41
42
# File 'lib/moqueue/mock_queue.rb', line 34

def receive(message, header_opts={})
  if callback = message_handler_callback
    headers = MockHeaders.new(header_opts)
    callback.call(*(callback.arity == 1 ? [message] : [headers, message]))
    received_messages_and_headers << {:message => message, :headers => headers}
  else
    receive_message_later(message, header_opts)
  end
end

#received_ack_for_message?(message_content) ⇒ Boolean

Returns:

  • (Boolean)

60
61
62
# File 'lib/moqueue/mock_queue.rb', line 60

def received_ack_for_message?(message_content)
  acked_messages.include?(message_content)
end

#received_headersObject


87
88
89
# File 'lib/moqueue/mock_queue.rb', line 87

def received_headers
  received_messages_and_headers.map{ |r| r[:headers] }
end

#received_message?(message_content) ⇒ Boolean

Returns:

  • (Boolean)

44
45
46
# File 'lib/moqueue/mock_queue.rb', line 44

def received_message?(message_content)
  received_messages.include?(message_content)
end

#received_messagesObject


83
84
85
# File 'lib/moqueue/mock_queue.rb', line 83

def received_messages
  received_messages_and_headers.map{|r| r[:message] }
end

#received_messages_and_headersObject


79
80
81
# File 'lib/moqueue/mock_queue.rb', line 79

def received_messages_and_headers
  @received_messages_and_headers ||= []
end

#received_routing_key?(key) ⇒ Boolean

Returns:

  • (Boolean)

48
49
50
# File 'lib/moqueue/mock_queue.rb', line 48

def received_routing_key?(key)
  received_messages_and_headers.find { |r| r[:headers] && r[:headers].properties[:routing_key] == key }
end

#run_callback(*args) ⇒ Object


97
98
99
100
# File 'lib/moqueue/mock_queue.rb', line 97

def run_callback(*args)
  callback = message_handler_callback
  callback.call(*(callback.arity == 1 ? [args.first] : args))
end

#subscribe(opts = {}, &block) ⇒ Object


25
26
27
28
29
30
31
32
# File 'lib/moqueue/mock_queue.rb', line 25

def subscribe(opts={}, &block)
  if @subscribe_block 
    raise DoubleSubscribeError, "you can't subscribe to the same queue twice"
  end
  @subscribe_block = block
  @ack_msgs = opts[:ack] || false
  process_unhandled_messages
end

#unsubscribeObject


52
53
54
# File 'lib/moqueue/mock_queue.rb', line 52

def unsubscribe
  true
end