Class: Beetle::QueueProperties
- Inherits:
-
Object
- Object
- Beetle::QueueProperties
- Defined in:
- lib/beetle/queue_properties.rb
Defined Under Namespace
Classes: FailedRabbitRequest
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
Instance Method Summary collapse
-
#initialize(config) ⇒ QueueProperties
constructor
A new instance of QueueProperties.
- #log_error(msg, response) ⇒ Object
- #logger ⇒ Object
- #remove_binding(server, queue_name, exchange, properties_key) ⇒ Object
- #remove_obsolete_bindings(server, queue_name, bindings) ⇒ Object
- #retrieve_bindings(server, queue_name) ⇒ Object
- #run_rabbit_http_request(uri, request, &block) ⇒ Object
- #set_queue_policy!(server, queue_name, options = {}) ⇒ Object
- #update_queue_properties!(options) ⇒ Object
- #vhost ⇒ Object
Constructor Details
#initialize(config) ⇒ QueueProperties
Returns a new instance of QueueProperties.
10 11 12 |
# File 'lib/beetle/queue_properties.rb', line 10 def initialize(config) @config = config end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
8 9 10 |
# File 'lib/beetle/queue_properties.rb', line 8 def config @config end |
Instance Method Details
#log_error(msg, response) ⇒ Object
175 176 177 178 179 |
# File 'lib/beetle/queue_properties.rb', line 175 def log_error(msg, response) logger.error(msg) logger.error("Response code was #{response.code}") logger.error(response.body) end |
#logger ⇒ Object
181 182 183 |
# File 'lib/beetle/queue_properties.rb', line 181 def logger @config.logger end |
#remove_binding(server, queue_name, exchange, properties_key) ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/beetle/queue_properties.rb', line 143 def remove_binding(server, queue_name, exchange, properties_key) request_url = URI("http://#{server}/api/bindings/#{vhost}/e/#{exchange}/q/#{queue_name}/#{properties_key}") request = Net::HTTP::Delete.new(request_url) response = run_rabbit_http_request(request_url, request) do |http| http.request(request) end unless %w(200 201 204).include?(response.code) log_error("Failed to remove obsolete binding for queue #{queue_name}", response) raise FailedRabbitRequest.new("Could not retrieve queue bindings") end end |
#remove_obsolete_bindings(server, queue_name, bindings) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/beetle/queue_properties.rb', line 105 def remove_obsolete_bindings(server, queue_name, bindings) logger.debug "Removing obsolete bindings" raise ArgumentError.new("server missing") if server.blank? raise ArgumentError.new("queue name missing") if queue_name.blank? raise ArgumentError.new("bindings missing") if bindings.nil? desired_bindings = bindings.each_with_object({}) do |b, desired| desired[[b[:exchange], b[:key]]] = b.except(:exchange, :key) end server_bindings = retrieve_bindings(server, queue_name) server_bindings.each do |b| next unless b["destination_type"] == "queue" || b["destination"] == queue_name next if b["source"] == "" source_route = b.values_at("source", "routing_key") unless desired_bindings.has_key?(source_route) logger.info "Removing obsolete binding: #{b.inspect}" remove_binding(server, queue_name, b["source"], b["properties_key"]) end end end |
#retrieve_bindings(server, queue_name) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/beetle/queue_properties.rb', line 127 def retrieve_bindings(server, queue_name) request_url = URI("http://#{server}/api/queues/#{vhost}/#{queue_name}/bindings") request = Net::HTTP::Get.new(request_url) response = run_rabbit_http_request(request_url, request) do |http| http.request(request) end unless response.code == "200" log_error("Failed to retrieve bindings for queue #{queue_name}", response) raise FailedRabbitRequest.new("Could not retrieve queue bindings") end JSON.parse(response.body) end |
#run_rabbit_http_request(uri, request, &block) ⇒ Object
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/beetle/queue_properties.rb', line 157 def run_rabbit_http_request(uri, request, &block) request.basic_auth(config.user, config.password) case request.class::METHOD when 'GET' request["Accept"] = "application/json" when 'PUT' request["Content-Type"] = "application/json" end http = Net::HTTP.new(uri.hostname, config.api_port) http.read_timeout = config.rabbitmq_api_read_timeout http.write_timeout = config.rabbitmq_api_write_timeout if http.respond_to?(:write_timeout=) # don't do this in production: # http.set_debug_output(logger.instance_eval{ @logdev.dev }) http.start do |instance| block.call(instance) if block_given? end end |
#set_queue_policy!(server, queue_name, options = {}) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/beetle/queue_properties.rb', line 39 def set_queue_policy!(server, queue_name, ={}) logger.info "Setting queue policy: #{server}, #{queue_name}, #{.inspect}" raise ArgumentError.new("server missing") if server.blank? raise ArgumentError.new("queue name missing") if queue_name.blank? return unless [:dead_lettering] || [:lazy] # no need to worry that the server has the port 5672. Net:HTTP will take care of this. See below. policy_name = "#{queue_name}_policy" request_url = URI("http://#{server}/api/policies/#{vhost}/#{policy_name}") get_request = Net::HTTP::Get.new(request_url) put_request = Net::HTTP::Put.new(request_url) delete_request = Net::HTTP::Delete.new(request_url) # set up queue policy definition = {} if [:dead_lettering] definition["dead-letter-routing-key"] = [:routing_key] definition["dead-letter-exchange"] = "" definition["message-ttl"] = [:message_ttl] if [:message_ttl] end definition["queue-mode"] = "lazy" if [:lazy] put_request_body = { "pattern" => "^#{queue_name}$", "priority" => 1, "apply-to" => "queues", "definition" => definition, } is_default_policy = definition == config.broker_default_policy get_response = run_rabbit_http_request(request_url, get_request) do |http| http.request(get_request, nil) end case get_response.code when "200" response_body = JSON.parse(get_response.body) rescue {} same_policy = put_request_body.all? { |k,v| response_body[k] == v } if same_policy if is_default_policy run_rabbit_http_request(request_url, delete_request) do |http| http.request(get_request, nil) end end return :ok end when "404" return :ok if is_default_policy end put_response = run_rabbit_http_request(request_url, put_request) do |http| http.request(put_request, put_request_body.to_json) end unless %w(200 201 204).include?(put_response.code) log_error("Failed to create policy for queue #{queue_name}", put_response) raise FailedRabbitRequest.new("Could not create policy") end :ok end |
#update_queue_properties!(options) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/beetle/queue_properties.rb', line 18 def update_queue_properties!() logger.info "Updating queue properties: #{.inspect}" = .symbolize_keys server = [:server] target_queue = [:queue_name] dead_letter_queue_name = [:dead_letter_queue_name] = .slice(:lazy, :dead_lettering) # The order of policy creation is important. # We need to create the policy on the dead letter queue first to have the message_ttl setting # in place before the first message comes in. Otherwise a message will not get a ttl # applied and stay in the dead letter queue forever (or until manually consumed), thus # blocking the head of the queue. = .merge(:routing_key => target_queue, :message_ttl => [:message_ttl]) set_queue_policy!(server, dead_letter_queue_name, ) = .merge(:routing_key => dead_letter_queue_name) set_queue_policy!(server, target_queue, ) remove_obsolete_bindings(server, target_queue, [:bindings]) if .has_key?(:bindings) end |
#vhost ⇒ Object
14 15 16 |
# File 'lib/beetle/queue_properties.rb', line 14 def vhost CGI.escape(@config.vhost) end |