Class: Vx::Common::AMQP::Session
- Inherits:
-
Object
- Object
- Vx::Common::AMQP::Session
show all
- Includes:
- Instrument
- Defined in:
- lib/vx/common/amqp/session.rb
Defined Under Namespace
Classes: ConnectionDoesNotExist
Constant Summary
collapse
- CHANNEL_KEY =
:vx_amqp_channel
- @@session_lock =
Mutex.new
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Instrument
#instrument
Instance Attribute Details
#conn ⇒ Object
Returns the value of attribute conn.
15
16
17
|
# File 'lib/vx/common/amqp/session.rb', line 15
def conn
@conn
end
|
Class Method Details
.resume ⇒ Object
26
27
28
|
# File 'lib/vx/common/amqp/session.rb', line 26
def resume
@shutdown = false
end
|
.shutdown ⇒ Object
18
19
20
|
# File 'lib/vx/common/amqp/session.rb', line 18
def shutdown
@shutdown = true
end
|
.shutdown? ⇒ Boolean
22
23
24
|
# File 'lib/vx/common/amqp/session.rb', line 22
def shutdown?
@shutdown == true
end
|
Instance Method Details
#channel ⇒ Object
97
98
99
100
101
|
# File 'lib/vx/common/amqp/session.rb', line 97
def channel
assert_connection_is_open
Thread.current[CHANNEL_KEY] || conn.default_channel
end
|
#close ⇒ Object
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/vx/common/amqp/session.rb', line 31
def close
if open?
@@session_lock.synchronize do
begin
conn.close
rescue Bunny::ChannelError => e
warn e
end
while conn.status != :closed
sleep 0.01
end
@conn = nil
end
end
end
|
#config ⇒ Object
130
131
132
|
# File 'lib/vx/common/amqp/session.rb', line 130
def config
Common::AMQP.config
end
|
#conn_info ⇒ Object
117
118
119
120
121
|
# File 'lib/vx/common/amqp/session.rb', line 117
def conn_info
if conn
"amqp://#{conn.user}@#{conn.host}:#{conn.port}/#{conn.vhost}"
end
end
|
#declare_exchange(name, options = nil) ⇒ Object
78
79
80
81
82
83
84
85
86
|
# File 'lib/vx/common/amqp/session.rb', line 78
def declare_exchange(name, options = nil)
assert_connection_is_open
options ||= {}
name ||= config.default_exchange_name
ch = options.delete(:channel) || channel
type, opts = get_exchange_type_and_options options
ch.exchange name, opts.merge(type: type)
end
|
#declare_queue(name, options = nil) ⇒ Object
88
89
90
91
92
93
94
95
|
# File 'lib/vx/common/amqp/session.rb', line 88
def declare_queue(name, options = nil)
assert_connection_is_open
options ||= {}
ch = options.delete(:channel) || channel
name, opts = get_queue_name_and_options(name, options)
ch.queue name, opts
end
|
#open ⇒ Object
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/vx/common/amqp/session.rb', line 47
def open
return self if open?
@@session_lock.synchronize do
self.class.resume
@conn ||= Bunny.new config.url, heartbeat: :server
instrumentation = {
info: conn_info
}
instrument("start_connecting.consumer.amqp", instrumentation)
instrument("connect.consumer.amqp", instrumentation) do
unless conn.open?
conn.start
while conn.connecting?
sleep 0.01
end
end
end
end
self
end
|
#open? ⇒ Boolean
74
75
76
|
# File 'lib/vx/common/amqp/session.rb', line 74
def open?
conn && conn.open? && conn.status == :open
end
|
#server_name ⇒ Object
123
124
125
126
127
128
|
# File 'lib/vx/common/amqp/session.rb', line 123
def server_name
if conn
p = conn.server_properties || {}
"#{p["product"]}/#{p["version"]}"
end
end
|
#with_channel ⇒ Object
103
104
105
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/vx/common/amqp/session.rb', line 103
def with_channel
assert_connection_is_open
old,new = nil
begin
old,new = Thread.current[CHANNEL_KEY], conn.create_channel
Thread.current[CHANNEL_KEY] = new
yield
ensure
Thread.current[CHANNEL_KEY] = old
new.close if new && new.open?
end
end
|