Class: Telemetry::AMQP::Base

Inherits:
Object
  • Object
show all
Includes:
Defaults, Management
Defined in:
lib/telemetry/amqp/base.rb

Instance Method Summary collapse

Methods included from Management

#ex_q_bindings, #headers, #mgmt_connection, #mgmt_node, #mgmt_password, #mgmt_port, #mgmt_url, #mgmt_user, #remove_binding

Methods included from Defaults

#app_version, #application, #connection_name, #env_prefix, #hostname, #nodes, #opts, #password, #port, #socket_hostname, #use_ssl?, #username, #vhost

Constructor Details

#initialize(auto_start: false, **opts) ⇒ Base

Returns a new instance of Base.



13
14
15
16
# File 'lib/telemetry/amqp/base.rb', line 13

def initialize(auto_start: false, **opts)
  @opts = opts
  connect! if auto_start && !nodes.nil?
end

Instance Method Details

#channelObject



58
59
60
61
62
63
64
65
# File 'lib/telemetry/amqp/base.rb', line 58

def channel
  if !@channel_thread.nil? && !@channel_thread.value.nil? && @channel_thread.value.open?
    return @channel_thread.value
  end

  @channel_thread = Concurrent::ThreadLocalVar.new(nil) if @channel_thread.nil?
  @channel_thread.value = create_channel
end

#connect!Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/telemetry/amqp/base.rb', line 36

def connect!
  @session = Concurrent::AtomicReference.new(
    ::Bunny.new(
      hosts: nodes || ['localhost:5672'],
      username: username,
      password: password,
      vhost: vhost,
      port: port,
      connection_name: connection_name,
      log_level: ::Logger::WARN,
      logger: Telemetry::Logger,
      automatically_recover: opts[:automatically_recover] || true,
      verify_peer: opts[:verify_peer] || true,
      tls: use_ssl?
    )
  )
  @session.value.start
  @channel_thread = Concurrent::ThreadLocalVar.new(nil)
  set_amqp_block_helpers
  @session.value
end

#create_channel(consumer_pool_size: 1, abort_on_exception: false, timeout: 30) ⇒ Object



67
68
69
# File 'lib/telemetry/amqp/base.rb', line 67

def create_channel(consumer_pool_size: 1, abort_on_exception: false, timeout: 30)
  session.create_channel(nil, consumer_pool_size, abort_on_exception, timeout)
end

#sessionObject



18
19
20
21
22
# File 'lib/telemetry/amqp/base.rb', line 18

def session
  connect! if @session.nil? || !@session.respond_to?(:value)

  @session.value
end

#set_amqp_block_helpersObject



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/telemetry/amqp/base.rb', line 24

def set_amqp_block_helpers
  session.on_blocked { puts 'Telemetry::AMQP is being blocked by RabbitMQ!' } if session.respond_to? :on_blocked

  if session.respond_to? :on_unblocked
    session.on_unblocked { puts 'Telemetry::AMQP is no longer being blocked by RabbitMQ' }
  end

  if session.respond_to? :after_recovery_completed # rubocop:disable Style/GuardClause
    session.after_recovery_completed { puts 'Telemetry::AMQP has completed recovery' }
  end
end