Class: Liebre::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/liebre/publisher.rb

Instance Method Summary collapse

Constructor Details

#initialize(publisher_name) ⇒ Publisher

Returns a new instance of Publisher.



4
5
6
# File 'lib/liebre/publisher.rb', line 4

def initialize publisher_name
  @publisher_name = publisher_name
end

Instance Method Details

#enqueue(message, options = {}) ⇒ Object



8
9
10
11
12
13
14
# File 'lib/liebre/publisher.rb', line 8

def enqueue message, options = {}
  with_connection do
    exchange = exchange_for default_channel
    logger.debug "Liebre: Publishing '#{message}' with '#{options}' to exchange: #{exchange.name}"
    exchange.publish message, options
  end
end

#enqueue_and_wait(message, options = {}) ⇒ Object Also known as: rpc



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/liebre/publisher.rb', line 16

def enqueue_and_wait message, options = {}
  result = nil
  with_rpc_channel do |channel|
    correlation_id = options[:correlation_id] ||= generate_uuid
    reply_queue = reply_queue channel, correlation_id
    options[:reply_to] = reply_queue.name
    reply_queue.subscribe(:block => false) do |delivery_info, meta, payload|
      if meta[:correlation_id] == correlation_id
        result = payload
        logger.debug "Liebre: Received response '#{result}'"
        channel.consumers[delivery_info.consumer_tag].cancel
      end
    end
    exchange = exchange_for channel
    logger.debug "Liebre: Publishing '#{message}' with '#{options}' to exchange: #{exchange.name}"
    exchange.publish message, options
    begin
      Timeout.timeout(Liebre.config.rpc_request_timeout) do
        sleep 0.01 while result.nil?
      end
    rescue Timeout::Error
      #do nothing
    end
  end
  result
end