Class: Telemetry::AMQP::Base
Instance Method Summary
collapse
Methods included from Management
#ex_q_bindings, #headers, #mgmt_connection, #mgmt_node, #mgmt_password, #mgmt_port, #mgmt_url, #mgmt_user, #remove_binding
Methods included from Defaults
#app_version, #application, #connection_name, #env_prefix, #hostname, #nodes, #opts, #password, #port, #socket_hostname, #use_ssl?, #username, #vhost
Constructor Details
#initialize(auto_start: false, **opts) ⇒ Base
Returns a new instance of Base.
13
14
15
16
|
# File 'lib/telemetry/amqp/base.rb', line 13
def initialize(auto_start: false, **opts)
@opts = opts
connect! if auto_start && !nodes.nil?
end
|
Instance Method Details
#channel ⇒ Object
58
59
60
61
62
63
64
65
|
# File 'lib/telemetry/amqp/base.rb', line 58
def channel
if !@channel_thread.nil? && !@channel_thread.value.nil? && @channel_thread.value.open?
return @channel_thread.value
end
@channel_thread = Concurrent::ThreadLocalVar.new(nil) if @channel_thread.nil?
@channel_thread.value = create_channel
end
|
#connect! ⇒ Object
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
# File 'lib/telemetry/amqp/base.rb', line 36
def connect!
@session = Concurrent::AtomicReference.new(
::Bunny.new(
hosts: nodes || ['localhost:5672'],
username: username,
password: password,
vhost: vhost,
port: port,
connection_name: connection_name,
log_level: ::Logger::WARN,
logger: Telemetry::Logger,
automatically_recover: opts[:automatically_recover] || true,
verify_peer: opts[:verify_peer] || true,
tls: use_ssl?
)
)
@session.value.start
@channel_thread = Concurrent::ThreadLocalVar.new(nil)
set_amqp_block_helpers
@session.value
end
|
#create_channel(consumer_pool_size: 1, abort_on_exception: false, timeout: 30) ⇒ Object
67
68
69
|
# File 'lib/telemetry/amqp/base.rb', line 67
def create_channel(consumer_pool_size: 1, abort_on_exception: false, timeout: 30)
session.create_channel(nil, consumer_pool_size, abort_on_exception, timeout)
end
|
#session ⇒ Object
18
19
20
21
22
|
# File 'lib/telemetry/amqp/base.rb', line 18
def session
connect! if @session.nil? || !@session.respond_to?(:value)
@session.value
end
|
#set_amqp_block_helpers ⇒ Object
24
25
26
27
28
29
30
31
32
33
34
|
# File 'lib/telemetry/amqp/base.rb', line 24
def set_amqp_block_helpers
session.on_blocked { puts 'Telemetry::AMQP is being blocked by RabbitMQ!' } if session.respond_to? :on_blocked
if session.respond_to? :on_unblocked
session.on_unblocked { puts 'Telemetry::AMQP is no longer being blocked by RabbitMQ' }
end
if session.respond_to? :after_recovery_completed session.after_recovery_completed { puts 'Telemetry::AMQP has completed recovery' }
end
end
|