Class: SidekickQueuePublisher
- Inherits:
-
Object
- Object
- SidekickQueuePublisher
- Defined in:
- lib/sidekick/shared/sidekick_queue_publisher.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
Returns the value of attribute logger.
Instance Method Summary collapse
-
#initialize(config, logger) ⇒ SidekickQueuePublisher
constructor
A new instance of SidekickQueuePublisher.
-
#publish(message, routing_key, temp_reply_queue_name = nil) ⇒ Object
Publish a message using a given routing key If a temp reply queue name is supplied, this method will automatically subscribe to the reply queue and block until a message is received.
Constructor Details
#initialize(config, logger) ⇒ SidekickQueuePublisher
Returns a new instance of SidekickQueuePublisher.
6 7 8 9 10 11 12 13 14 15 |
# File 'lib/sidekick/shared/sidekick_queue_publisher.rb', line 6 def initialize(config, logger) @logger = logger @config = config.with_indifferent_access # Deprecations section if @config[:brokers] raise SyntaxError, "ERROR: 'brokers' is a deprecated config value. Please use something like:\n:broker:\n :ip: 127.0.0.1\n :port: 1234" end end |
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
4 5 6 |
# File 'lib/sidekick/shared/sidekick_queue_publisher.rb', line 4 def logger @logger end |
Instance Method Details
#publish(message, routing_key, temp_reply_queue_name = nil) ⇒ Object
Publish a message using a given routing key If a temp reply queue name is supplied, this method will automatically subscribe to the reply queue and block until a message is received.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/sidekick/shared/sidekick_queue_publisher.rb', line 21 def publish(, routing_key, temp_reply_queue_name=nil) with_bunny_connection(temp_reply_queue_name) do |reply_queue| if routing_key =~ /amq.gen/ logger.warn "Cannot publish messages to queues starting with amq.gen - this must have been an old request and will be dropped." return true else logger.info "Publishing message to #{routing_key}: #{}" end if reply_queue response = nil @b.queue(routing_key).publish(, :persistent => true) msg="" begin Timeout::timeout(5) do while true msg = reply_queue.pop(:ack => true) msg = msg[:payload] if msg.is_a?(Hash) # Support older implementations of bunny lib if msg == :queue_empty # keep going else logger.info "Got msg #{msg}" response = msg break end end end rescue Timeout::Error logger.error "Timed out while waiting for response from server." response = ":error: Timed out while waiting for response, messaging server may be down." end begin reply_queue.delete rescue logger.warn "Error deleting reply queue #{reply_queue.inspect}" end return response else @b.queue(routing_key).publish(, :persistent => true) end return true end end |