Class: SidekickQueuePublisher

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekick/shared/sidekick_queue_publisher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, logger) ⇒ SidekickQueuePublisher

Returns a new instance of SidekickQueuePublisher.



6
7
8
9
10
11
12
13
14
15
# File 'lib/sidekick/shared/sidekick_queue_publisher.rb', line 6

def initialize(config, logger)
  @logger = logger
  @config = config.with_indifferent_access

  # Deprecations section
  if @config[:brokers]
    raise SyntaxError, "ERROR: 'brokers' is a deprecated config value. Please use something like:\n:broker:\n  :ip: 127.0.0.1\n  :port: 1234"
  end

end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



4
5
6
# File 'lib/sidekick/shared/sidekick_queue_publisher.rb', line 4

def logger
  @logger
end

Instance Method Details

#publish(message, routing_key, temp_reply_queue_name = nil) ⇒ Object

Publish a message using a given routing key If a temp reply queue name is supplied, this method will automatically subscribe to the reply queue and block until a message is received.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/sidekick/shared/sidekick_queue_publisher.rb', line 21

def publish(message, routing_key, temp_reply_queue_name=nil)    
  with_bunny_connection(temp_reply_queue_name) do |reply_queue|
    if routing_key =~ /amq.gen/
      logger.warn "Cannot publish messages to queues starting with amq.gen - this must have been an old request and will be dropped."
      return true
    else
      logger.info "Publishing message to #{routing_key}: #{message}"      
    end
    
    if reply_queue
      response = nil
      @b.queue(routing_key).publish(message, :persistent => true)          

      msg=""
      begin
        Timeout::timeout(5) do
          while true
          	msg = reply_queue.pop(:ack => true)
          	msg = msg[:payload] if msg.is_a?(Hash) # Support older implementations of bunny lib

            if msg == :queue_empty
              # keep going
            else
            	logger.info "Got msg #{msg}"
              response = msg
              break
            end
          end
        end
      rescue Timeout::Error
        logger.error "Timed out while waiting for response from server."
        response = ":error: Timed out while waiting for response, messaging server may be down."        
      end

      begin
        reply_queue.delete
      rescue
        logger.warn "Error deleting reply queue #{reply_queue.inspect}"
      end
    
      return response
    else
      @b.queue(routing_key).publish(message, :persistent => true)          
    end
    return true
  end
end