Class: BunnyMock::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny_mock/channel.rb

Instance Attribute Summary collapse

Exchange API collapse

Queue API collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection = nil, id = nil) ⇒ Channel

Create a new BunnyMock::Channel instance



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/bunny_mock/channel.rb', line 28

def initialize(connection = nil, id = nil)
  # store channel id
  @id = id

  # store connection information
  @connection = connection

  # initialize exchange and queue storage
  @exchanges = {}
  @queues    = {}
  @acknowledged_state = { pending: {}, acked: {}, nacked: {}, rejected: {} }

  # set status to opening
  @status = :opening
end

Instance Attribute Details

#acknowledged_stateHash (readonly)



18
19
20
# File 'lib/bunny_mock/channel.rb', line 18

def acknowledged_state
  @acknowledged_state
end

#connectionBunnyMock::Session (readonly)



12
13
14
# File 'lib/bunny_mock/channel.rb', line 12

def connection
  @connection
end

#idInteger (readonly)



9
10
11
# File 'lib/bunny_mock/channel.rb', line 9

def id
  @id
end

#statusSymbol (readonly)



15
16
17
# File 'lib/bunny_mock/channel.rb', line 15

def status
  @status
end

Instance Method Details

#ack(delivery_tag, multiple = false) ⇒ Object Also known as: acknowledge

Acknowledge message.



279
280
281
282
283
284
285
286
287
288
# File 'lib/bunny_mock/channel.rb', line 279

def ack(delivery_tag, multiple = false)
  if multiple
    @acknowledged_state[:pending].keys.each do |key|
      ack(key, false) if key <= delivery_tag
    end
  elsif @acknowledged_state[:pending].key?(delivery_tag)
    update_acknowledgement_state(delivery_tag, :acked)
  end
  nil
end

#basic_publish(payload, xchg, routing_key, opts = {}) ⇒ BunnyMock::Channel



201
202
203
204
205
206
207
# File 'lib/bunny_mock/channel.rb', line 201

def basic_publish(payload, xchg, routing_key, opts = {})
  xchg = xchg_find_or_create(xchg) unless xchg.respond_to? :name

  xchg.publish payload, opts.merge(routing_key: routing_key)

  self
end

#closeBunnyMock::Channel

Sets status to closed



74
75
76
77
78
# File 'lib/bunny_mock/channel.rb', line 74

def close
  @status = :closed

  self
end

#closed?Boolean

Returns true if status is closed, false otherwise.



52
53
54
# File 'lib/bunny_mock/channel.rb', line 52

def closed?
  @status == :closed
end

#confirm_select(callback = nil) ⇒ Object

Does nothing atm.



246
247
248
# File 'lib/bunny_mock/channel.rb', line 246

def confirm_select(callback = nil)
  # noop
end

#default_exchangeBunnyMock::Exchange

Mocks RabbitMQ default exchange



188
189
190
# File 'lib/bunny_mock/channel.rb', line 188

def default_exchange
  direct '', no_declare: true
end

#direct(name, opts = {}) ⇒ BunnyMock::Exchange

Mocks a direct exchange

Options Hash (opts):

  • :durable (Boolean)
  • :auto_delete (Boolean)
  • :arguments (Hash)


144
145
146
# File 'lib/bunny_mock/channel.rb', line 144

def direct(name, opts = {})
  exchange name, opts.merge(type: :direct)
end

#exchange(name, opts = {}) ⇒ BunnyMock::Exchange

Mocks an exchange

Options Hash (opts):

  • :type (Symbol, String)

    Type of exchange

  • :durable (Boolean)
  • :auto_delete (Boolean)
  • :arguments (Hash)


102
103
104
# File 'lib/bunny_mock/channel.rb', line 102

def exchange(name, opts = {})
  @connection.register_exchange xchg_find_or_create(name, opts)
end

#fanout(name, opts = {}) ⇒ BunnyMock::Exchange

Mocks a fanout exchange

Options Hash (opts):

  • :durable (Boolean)
  • :auto_delete (Boolean)
  • :arguments (Hash)


127
128
129
# File 'lib/bunny_mock/channel.rb', line 127

def fanout(name, opts = {})
  exchange name, opts.merge(type: :fanout)
end

#generate_consumer_tag(name = 'bunny') ⇒ String

Unique string supposed to be used as a consumer tag.



110
111
112
# File 'lib/bunny_mock/channel.rb', line 110

def generate_consumer_tag(name = 'bunny')
  "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
end

#headers(name, opts = {}) ⇒ BunnyMock::Exchange

Mocks a headers exchange

Options Hash (opts):

  • :durable (Boolean)
  • :auto_delete (Boolean)
  • :arguments (Hash)


178
179
180
# File 'lib/bunny_mock/channel.rb', line 178

def headers(name, opts = {})
  exchange name, opts.merge(type: :headers)
end

#nack(delivery_tag, multiple = false, requeue = false) ⇒ Object

Unacknowledge message.



301
302
303
304
305
306
307
308
309
310
311
# File 'lib/bunny_mock/channel.rb', line 301

def nack(delivery_tag, multiple = false, requeue = false)
  if multiple
    @acknowledged_state[:pending].keys.each do |key|
      nack(key, false, requeue) if key <= delivery_tag
    end
  elsif @acknowledged_state[:pending].key?(delivery_tag)
    delivery, header, body = update_acknowledgement_state(delivery_tag, :nacked)
    delivery.queue.publish(body, header.to_hash) if requeue
  end
  nil
end

#openBunnyMock::Channel

Sets status to open



62
63
64
65
66
# File 'lib/bunny_mock/channel.rb', line 62

def open
  @status = :open

  self
end

#open?Boolean

Returns true if status is open, false otherwise.



46
47
48
# File 'lib/bunny_mock/channel.rb', line 46

def open?
  @status == :open
end

#prefetchObject

Does nothing atm.



256
257
258
# File 'lib/bunny_mock/channel.rb', line 256

def prefetch(*)
  # noop
end

#queue(name = '', opts = {}) ⇒ BunnyMock::Queue

Create a new Queue instance, or find in channel cache



222
223
224
225
# File 'lib/bunny_mock/channel.rb', line 222

def queue(name = '', opts = {})
  queue = @connection.find_queue(name) || Queue.new(self, name, opts)
  @connection.register_queue queue
end

#reject(delivery_tag, requeue = false) ⇒ Object

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.



323
324
325
326
327
328
329
# File 'lib/bunny_mock/channel.rb', line 323

def reject(delivery_tag, requeue = false)
  if @acknowledged_state[:pending].key?(delivery_tag)
    delivery, header, body = update_acknowledgement_state(delivery_tag, :rejected)
    delivery.queue.publish(body, header.to_hash) if requeue
  end
  nil
end

#temporary_queue(opts = {}) ⇒ BunnyMock::Queue

Create a new Queue instance with no name

See Also:



236
237
238
# File 'lib/bunny_mock/channel.rb', line 236

def temporary_queue(opts = {})
  queue '', opts.merge(exclusive: true)
end

#to_sString Also known as: inspect



81
82
83
# File 'lib/bunny_mock/channel.rb', line 81

def to_s
  "#<#{self.class.name}:#{object_id} @id=#{@id} @open=#{open?}>"
end

#topic(name, opts = {}) ⇒ BunnyMock::Exchange

Mocks a topic exchange

Options Hash (opts):

  • :durable (Boolean)
  • :auto_delete (Boolean)
  • :arguments (Hash)


161
162
163
# File 'lib/bunny_mock/channel.rb', line 161

def topic(name, opts = {})
  exchange name, opts.merge(type: :topic)
end

#wait_for_confirmsObject

Does not actually wait, but always return true.



266
267
268
# File 'lib/bunny_mock/channel.rb', line 266

def wait_for_confirms(*)
  true
end