Class: Chef::IndexQueue::AmqpClient
- Inherits:
-
Object
- Object
- Chef::IndexQueue::AmqpClient
- Includes:
- Singleton
- Defined in:
- lib/chef/index_queue/amqp_client.rb
Instance Method Summary collapse
- #amqp_client ⇒ Object
- #disconnected! ⇒ Object
- #exchange ⇒ Object
-
#initialize ⇒ AmqpClient
constructor
A new instance of AmqpClient.
- #queue ⇒ Object
- #reset! ⇒ Object
- #send_action(action, data) ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize ⇒ AmqpClient
Returns a new instance of AmqpClient.
24 25 26 |
# File 'lib/chef/index_queue/amqp_client.rb', line 24 def initialize reset! end |
Instance Method Details
#amqp_client ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/chef/index_queue/amqp_client.rb', line 40 def amqp_client unless @amqp_client begin @amqp_client = Bunny.new(amqp_opts) Chef::Log.debug "Starting AMQP connection with client settings: #{@amqp_client.inspect}" @amqp_client.start @amqp_client.qos(:prefetch_count => 1) rescue Bunny::ServerDownError => e Chef::Log.fatal "Could not connect to rabbitmq. Is it running, reachable, and configured correctly?" raise e rescue Bunny::ProtocolError => e Chef::Log.fatal "Connection to rabbitmq refused. Check your rabbitmq configuration and chef's amqp* settings" raise e end end @amqp_client end |
#disconnected! ⇒ Object
70 71 72 73 74 |
# File 'lib/chef/index_queue/amqp_client.rb', line 70 def disconnected! Chef::Log.error("Disconnected from the AMQP Broker (RabbitMQ)") @amqp_client = nil reset! end |
#exchange ⇒ Object
58 59 60 |
# File 'lib/chef/index_queue/amqp_client.rb', line 58 def exchange @exchange ||= amqp_client.exchange("chef-indexer", :durable => true, :type => :fanout) end |
#queue ⇒ Object
62 63 64 65 66 67 68 |
# File 'lib/chef/index_queue/amqp_client.rb', line 62 def queue unless @queue @queue = amqp_client.queue("chef-index-consumer-" + consumer_id, :durable => durable_queue?) @queue.bind(exchange) end @queue end |
#reset! ⇒ Object
28 29 30 31 32 33 |
# File 'lib/chef/index_queue/amqp_client.rb', line 28 def reset! @amqp_client && amqp_client.connected? && amqp_client.stop @amqp_client = nil @exchange = nil @queue = nil end |
#send_action(action, data) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/chef/index_queue/amqp_client.rb', line 76 def send_action(action, data) retries = 0 begin exchange.publish(Chef::JSONCompat.to_json({"action" => action.to_s, "payload" => data})) rescue Bunny::ServerDownError, Bunny::ConnectionError, Errno::ECONNRESET disconnected! if (retries += 1) < 2 Chef::Log.info("Attempting to reconnect to the AMQP broker") retry else Chef::Log.fatal("Could not re-connect to the AMQP broker, giving up") raise end end end |
#stop ⇒ Object
35 36 37 38 |
# File 'lib/chef/index_queue/amqp_client.rb', line 35 def stop @queue && @queue.subscription && @queue.unsubscribe @amqp_client && @amqp_client.stop end |