Class: Rmsg::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/rmsg/topic.rb

Overview

Topic handles publishing and subscribing to a topic with a key, over RabbitMQ.

Instance Method Summary collapse

Constructor Details

#initialize(params) ⇒ Topic

Returns a new instance of Topic.

Parameters:

  • params (Hash)

Options Hash (params):

  • :rabbit (Rmsg::Rabbit)

    Example: Rmsg::Rabbit.new

  • :topic (String)

    Example: ‘services’



8
9
10
11
# File 'lib/rmsg/topic.rb', line 8

def initialize(params)
  @rabbit = params[:rabbit]
  @exchange = @rabbit.channel.topic(params[:topic])
end

Instance Method Details

#publish(message, key) ⇒ Exchange

Publish a message with a routing key.

Parameters:

  • message (Hash)

    Message to be published.

  • key (String)

    Example: ‘users.key_changed’

Returns:

  • (Exchange)

    The exchange used to publish.



17
18
19
# File 'lib/rmsg/topic.rb', line 17

def publish(message, key)
  @exchange.publish(message.to_json, :routing_key => key)
end

#subscribe(key) {|message| ... } ⇒ Object

Subscribe to the topic, listening for a specific key. Subscribing happens by continuously blocking the current process. It is specifically designed for long running processes. When receiving INT it will gracefully close.

Parameters:

  • key (String)

    Example: ‘users.key_changed’

Yield Parameters:

  • message (Hash)

    The message received, to be processed within the block.



27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/rmsg/topic.rb', line 27

def subscribe(key)
  @queue = @rabbit.channel.queue("", :exclusive => true)
  @queue.bind(@exchange, :routing_key => key)
  begin
    @queue.subscribe(:block => true) do |delivery_info, , payload|
      message = JSON.parse(payload, symbolize_names: true)
      yield message
    end
  rescue Interrupt => _
    @rabbit.close
  end
end