Class: Beetle::QueueProperties

Inherits:
Object
  • Object
show all
Defined in:
lib/beetle/queue_properties.rb

Defined Under Namespace

Classes: FailedRabbitRequest

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ QueueProperties

Returns a new instance of QueueProperties.



10
11
12
# File 'lib/beetle/queue_properties.rb', line 10

def initialize(config)
  @config = config
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



8
9
10
# File 'lib/beetle/queue_properties.rb', line 8

def config
  @config
end

Instance Method Details

#log_error(msg, response) ⇒ Object



175
176
177
178
179
# File 'lib/beetle/queue_properties.rb', line 175

def log_error(msg, response)
  logger.error(msg)
  logger.error("Response code was #{response.code}")
  logger.error(response.body)
end

#loggerObject



181
182
183
# File 'lib/beetle/queue_properties.rb', line 181

def logger
  @config.logger
end

#remove_binding(server, queue_name, exchange, properties_key) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/beetle/queue_properties.rb', line 143

def remove_binding(server, queue_name, exchange, properties_key)
  request_url = URI("http://#{server}/api/bindings/#{vhost}/e/#{exchange}/q/#{queue_name}/#{properties_key}")
  request = Net::HTTP::Delete.new(request_url)

  response = run_rabbit_http_request(request_url, request) do |http|
    http.request(request)
  end

  unless %w(200 201 204).include?(response.code)
    log_error("Failed to remove obsolete binding for queue #{queue_name}", response)
    raise FailedRabbitRequest.new("Could not retrieve queue bindings")
  end
end

#remove_obsolete_bindings(server, queue_name, bindings) ⇒ Object

Raises:

  • (ArgumentError)


105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/beetle/queue_properties.rb', line 105

def remove_obsolete_bindings(server, queue_name, bindings)
  logger.debug "Removing obsolete bindings"
  raise ArgumentError.new("server missing")     if server.blank?
  raise ArgumentError.new("queue name missing") if queue_name.blank?
  raise ArgumentError.new("bindings missing")   if bindings.nil?

  desired_bindings = bindings.each_with_object({}) do |b, desired|
    desired[[b[:exchange], b[:key]]] = b.except(:exchange, :key)
  end

  server_bindings = retrieve_bindings(server, queue_name)
  server_bindings.each do |b|
    next unless b["destination_type"] == "queue" || b["destination"] == queue_name
    next if b["source"] == ""
    source_route = b.values_at("source", "routing_key")
    unless desired_bindings.has_key?(source_route)
      logger.info "Removing obsolete binding: #{b.inspect}"
      remove_binding(server, queue_name, b["source"], b["properties_key"])
    end
  end
end

#retrieve_bindings(server, queue_name) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/beetle/queue_properties.rb', line 127

def retrieve_bindings(server, queue_name)
  request_url = URI("http://#{server}/api/queues/#{vhost}/#{queue_name}/bindings")
  request = Net::HTTP::Get.new(request_url)

  response = run_rabbit_http_request(request_url, request) do |http|
    http.request(request)
  end

  unless response.code == "200"
    log_error("Failed to retrieve bindings for queue #{queue_name}", response)
    raise FailedRabbitRequest.new("Could not retrieve queue bindings")
  end

  JSON.parse(response.body)
end

#run_rabbit_http_request(uri, request, &block) ⇒ Object



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/beetle/queue_properties.rb', line 157

def run_rabbit_http_request(uri, request, &block)
  request.basic_auth(config.user, config.password)
  case request.class::METHOD
  when 'GET'
    request["Accept"] = "application/json"
  when 'PUT'
    request["Content-Type"] = "application/json"
  end
  http = Net::HTTP.new(uri.hostname, config.api_port)
  http.read_timeout = config.rabbitmq_api_read_timeout
  http.write_timeout = config.rabbitmq_api_write_timeout if http.respond_to?(:write_timeout=)
  # don't do this in production:
  # http.set_debug_output(logger.instance_eval{ @logdev.dev })
  http.start do |instance|
    block.call(instance) if block_given?
  end
end

#set_queue_policy!(server, queue_name, options = {}) ⇒ Object

Raises:

  • (ArgumentError)


39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/beetle/queue_properties.rb', line 39

def set_queue_policy!(server, queue_name, options={})
  logger.info "Setting queue policy: #{server}, #{queue_name}, #{options.inspect}"

  raise ArgumentError.new("server missing")     if server.blank?
  raise ArgumentError.new("queue name missing") if queue_name.blank?

  return unless options[:dead_lettering] || options[:lazy]

  # no need to worry that the server has the port 5672. Net:HTTP will take care of this. See below.
  policy_name = "#{queue_name}_policy"
  request_url = URI("http://#{server}/api/policies/#{vhost}/#{policy_name}")
  get_request = Net::HTTP::Get.new(request_url)
  put_request = Net::HTTP::Put.new(request_url)
  delete_request = Net::HTTP::Delete.new(request_url)

  # set up queue policy
  definition = {}
  if options[:dead_lettering]
    definition["dead-letter-routing-key"] = options[:routing_key]
    definition["dead-letter-exchange"] = ""
    definition["message-ttl"] = options[:message_ttl] if options[:message_ttl]
  end

  definition["queue-mode"] = "lazy" if options[:lazy]

  put_request_body = {
    "pattern" => "^#{queue_name}$",
    "priority" => 1,
    "apply-to" => "queues",
    "definition" => definition,
  }

  is_default_policy = definition == config.broker_default_policy

  get_response = run_rabbit_http_request(request_url, get_request) do |http|
    http.request(get_request, nil)
  end

  case get_response.code
  when "200"
    response_body = JSON.parse(get_response.body) rescue {}
    same_policy = put_request_body.all? { |k,v| response_body[k] == v }
    if same_policy
      if is_default_policy
        run_rabbit_http_request(request_url, delete_request) do |http|
          http.request(get_request, nil)
        end
      end
      return :ok
    end
  when "404"
    return :ok if is_default_policy
  end

  put_response = run_rabbit_http_request(request_url, put_request) do |http|
    http.request(put_request, put_request_body.to_json)
  end

  unless %w(200 201 204).include?(put_response.code)
    log_error("Failed to create policy for queue #{queue_name}", put_response)
    raise FailedRabbitRequest.new("Could not create policy")
  end

  :ok
end

#update_queue_properties!(options) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/beetle/queue_properties.rb', line 18

def update_queue_properties!(options)
  logger.info "Updating queue properties: #{options.inspect}"
  options = options.symbolize_keys
  server = options[:server]
  target_queue = options[:queue_name]
  dead_letter_queue_name = options[:dead_letter_queue_name]
  policy_options = options.slice(:lazy, :dead_lettering)

  # The order of policy creation is important.
  # We need to create the policy on the dead letter queue first to have the message_ttl setting
  # in place before the first message comes in. Otherwise a message will not get a ttl
  # applied and stay in the dead letter queue forever (or until manually consumed), thus
  # blocking the head of the queue.
  dead_letter_queue_options = policy_options.merge(:routing_key => target_queue, :message_ttl => options[:message_ttl])
  set_queue_policy!(server, dead_letter_queue_name, dead_letter_queue_options)
  target_queue_options = policy_options.merge(:routing_key => dead_letter_queue_name)
  set_queue_policy!(server, target_queue, target_queue_options)

  remove_obsolete_bindings(server, target_queue, options[:bindings]) if options.has_key?(:bindings)
end

#vhostObject



14
15
16
# File 'lib/beetle/queue_properties.rb', line 14

def vhost
  CGI.escape(@config.vhost)
end