Class: Basquiat::Adapters::RabbitMq::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/basquiat/adapters/rabbitmq/session.rb

Overview

A RabbitMQ session.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, session_options = {}) ⇒ Session

Returns a new instance of Session.



10
11
12
13
# File 'lib/basquiat/adapters/rabbitmq/session.rb', line 10

def initialize(channel, session_options = {})
  @channel = channel
  @options = session_options
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



8
9
10
# File 'lib/basquiat/adapters/rabbitmq/session.rb', line 8

def channel
  @channel
end

Instance Method Details

#bind_queue(routing_key) ⇒ Object



15
16
17
# File 'lib/basquiat/adapters/rabbitmq/session.rb', line 15

def bind_queue(routing_key)
  queue.bind(exchange, routing_key: routing_key)
end

#exchangeObject



44
45
46
47
48
# File 'lib/basquiat/adapters/rabbitmq/session.rb', line 44

def exchange
  @exchange ||= channel.topic(@options[:exchange][:name],
                              durable:   @options[:exchange][:durable],
                              arguments: (@options[:exchange][:options] || {}))
end

#publish(routing_key, message, props = {}) ⇒ Object



19
20
21
22
23
24
25
# File 'lib/basquiat/adapters/rabbitmq/session.rb', line 19

def publish(routing_key, message, props = {})
  channel.confirm_select if @options[:publisher][:confirm]
  exchange.publish(Basquiat::Json.encode(message),
                   { routing_key: routing_key,
                     persistent:  true,
                     timestamp:   Time.now.to_i }.merge(props))
end

#queueObject



34
35
36
37
38
# File 'lib/basquiat/adapters/rabbitmq/session.rb', line 34

def queue
  @queue ||= channel.queue(@options.dig(:queue, :name),
                           durable:   @options.dig(:queue, :durable),
                           arguments: (@options[:queue][:options] || {}))
end

#queue_nameObject



40
41
42
# File 'lib/basquiat/adapters/rabbitmq/session.rb', line 40

def queue_name
  queue.name
end

#subscribe(block: true, manual_ack: @options[:consumer][:manual_ack]) ⇒ Object



27
28
29
30
31
32
# File 'lib/basquiat/adapters/rabbitmq/session.rb', line 27

def subscribe(block: true, manual_ack: @options[:consumer][:manual_ack])
  channel.prefetch(@options[:consumer][:prefetch])
  queue.subscribe(block: block, manual_ack: manual_ack) do |di, props, msg|
    yield Basquiat::Adapters::RabbitMq::Message.new(msg, di, props)
  end
end