Module: Telemetry::AMQP::Management

Includes:
Defaults
Included in:
Base
Defined in:
lib/telemetry/amqp/management.rb

Instance Method Summary collapse

Methods included from Defaults

#app_version, #application, #connection_name, #env_prefix, #hostname, #nodes, #opts, #password, #port, #socket_hostname, #use_ssl?, #username, #vhost

Instance Method Details

#ex_q_bindings(exchange:, queue:, vhost: '/') ⇒ Object



55
56
57
58
59
60
61
# File 'lib/telemetry/amqp/management.rb', line 55

def ex_q_bindings(exchange:, queue:, vhost: '/', **)
  mgmt_connection.get("/api/bindings/#{vhost}/e/#{exchange}/q/#{queue}").body
rescue StandardError => e
  puts e.message

  []
end

#headersObject



9
10
11
12
13
14
# File 'lib/telemetry/amqp/management.rb', line 9

def headers
  {
    'Content-Type' => 'application/json',
    'Accept' => 'application/json'
  }
end

#mgmt_connection(node = mgmt_node) ⇒ Object



44
45
46
47
48
49
50
51
52
53
# File 'lib/telemetry/amqp/management.rb', line 44

def mgmt_connection(node = mgmt_node)
  @mgmt_connection ||= Faraday.new(mgmt_url(node), headers: headers) do |conn|
    conn.request :json
    conn.basic_auth(mgmt_user, mgmt_password)
    conn.response :json, parser_options: { symbolize_names: true }
  end
rescue StandardError => e
  puts e.message
  nil
end

#mgmt_nodeObject



24
25
26
# File 'lib/telemetry/amqp/management.rb', line 24

def mgmt_node
  opts[:mgmt_node] || ENV["#{env_prefix})_mgmt_node"] || 'localhost'
end

#mgmt_passwordObject



20
21
22
# File 'lib/telemetry/amqp/management.rb', line 20

def mgmt_password
  opts[:mgmt_password] || ENV["#{env_prefix}_mgmt_password"] || password
end

#mgmt_portObject



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/telemetry/amqp/management.rb', line 28

def mgmt_port
  @mgmt_port ||= if opts.key? :mgmt_port
                   opts[:mgmt_port]
                 elsif ENV.key? "#{env_prefix}_mgmt_port"
                   ENV["#{env_prefix}_mgmt_port"].to_i
                 elsif use_ssl?
                   443
                 else
                   80
                 end
end

#mgmt_url(node = mgmt_node) ⇒ Object



40
41
42
# File 'lib/telemetry/amqp/management.rb', line 40

def mgmt_url(node = mgmt_node)
  "http#{'s' if use_ssl?}://#{node}:#{mgmt_port}"
end

#mgmt_userObject



16
17
18
# File 'lib/telemetry/amqp/management.rb', line 16

def mgmt_user
  opts[:mgmt_user] || ENV["#{env_prefix}_mgmt_user"] || username
end

#remove_binding(exchange: 'influxdb.out', queue: "influxdb.#{hostname}", key: '#', vhost: '/') ⇒ Object



63
64
65
66
67
68
69
# File 'lib/telemetry/amqp/management.rb', line 63

def remove_binding(exchange: 'influxdb.out', queue: "influxdb.#{hostname}", key: '#', vhost: '/', **)
  mgmt_connection.delete("/api/bindings/#{vhost}/e/#{exchange}/q/#{queue}/#{key}").success?
rescue StandardError => e
  puts e.message

  false
end