Module: Telemetry::AMQP::Management
- Includes:
- Defaults
- Included in:
- Base
- Defined in:
- lib/telemetry/amqp/management.rb
Instance Method Summary
collapse
Methods included from Defaults
#app_version, #application, #connection_name, #env_prefix, #hostname, #nodes, #opts, #password, #port, #socket_hostname, #use_ssl?, #username, #vhost
Instance Method Details
#ex_q_bindings(exchange:, queue:, vhost: '/') ⇒ Object
55
56
57
58
59
60
61
|
# File 'lib/telemetry/amqp/management.rb', line 55
def ex_q_bindings(exchange:, queue:, vhost: '/', **)
mgmt_connection.get("/api/bindings/#{vhost}/e/#{exchange}/q/#{queue}").body
rescue StandardError => e
puts e.message
[]
end
|
9
10
11
12
13
14
|
# File 'lib/telemetry/amqp/management.rb', line 9
def
{
'Content-Type' => 'application/json',
'Accept' => 'application/json'
}
end
|
#mgmt_connection(node = mgmt_node) ⇒ Object
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/telemetry/amqp/management.rb', line 44
def mgmt_connection(node = mgmt_node)
@mgmt_connection ||= Faraday.new(mgmt_url(node), headers: ) do |conn|
conn.request :json
conn.basic_auth(mgmt_user, mgmt_password)
conn.response :json, parser_options: { symbolize_names: true }
end
rescue StandardError => e
puts e.message
nil
end
|
#mgmt_node ⇒ Object
24
25
26
|
# File 'lib/telemetry/amqp/management.rb', line 24
def mgmt_node
opts[:mgmt_node] || ENV["#{env_prefix})_mgmt_node"] || 'localhost'
end
|
#mgmt_password ⇒ Object
20
21
22
|
# File 'lib/telemetry/amqp/management.rb', line 20
def mgmt_password
opts[:mgmt_password] || ENV["#{env_prefix}_mgmt_password"] || password
end
|
#mgmt_port ⇒ Object
28
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/telemetry/amqp/management.rb', line 28
def mgmt_port
@mgmt_port ||= if opts.key? :mgmt_port
opts[:mgmt_port]
elsif ENV.key? "#{env_prefix}_mgmt_port"
ENV["#{env_prefix}_mgmt_port"].to_i
elsif use_ssl?
443
else
80
end
end
|
#mgmt_url(node = mgmt_node) ⇒ Object
40
41
42
|
# File 'lib/telemetry/amqp/management.rb', line 40
def mgmt_url(node = mgmt_node)
"http#{'s' if use_ssl?}://#{node}:#{mgmt_port}"
end
|
#mgmt_user ⇒ Object
16
17
18
|
# File 'lib/telemetry/amqp/management.rb', line 16
def mgmt_user
opts[:mgmt_user] || ENV["#{env_prefix}_mgmt_user"] || username
end
|
#remove_binding(exchange: 'influxdb.out', queue: "influxdb.#{hostname}", key: '#', vhost: '/') ⇒ Object
63
64
65
66
67
68
69
|
# File 'lib/telemetry/amqp/management.rb', line 63
def remove_binding(exchange: 'influxdb.out', queue: "influxdb.#{hostname}", key: '#', vhost: '/', **)
mgmt_connection.delete("/api/bindings/#{vhost}/e/#{exchange}/q/#{queue}/#{key}").success?
rescue StandardError => e
puts e.message
false
end
|