Class: Hutch::Broker
- Inherits:
-
Object
show all
- Includes:
- Logging
- Defined in:
- lib/hutch/broker.rb
Constant Summary
collapse
- DEFAULT_AMQP_PORT =
case RUBY_ENGINE
when "jruby" then
com.rabbitmq.client.ConnectionFactory::DEFAULT_AMQP_PORT
when "ruby" then
AMQ::Protocol::DEFAULT_PORT
end
- DEFAULT_AMQPS_PORT =
case RUBY_ENGINE
when "jruby" then
com.rabbitmq.client.ConnectionFactory::DEFAULT_AMQP_OVER_SSL_PORT
when "ruby" then
AMQ::Protocol::TLS_PORT
end
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Logging
#logger, logger, logger=, setup_logger
Constructor Details
#initialize(config = nil) ⇒ Broker
Returns a new instance of Broker.
33
34
35
|
# File 'lib/hutch/broker.rb', line 33
def initialize(config = nil)
@config = config || Hutch::Config
end
|
Instance Attribute Details
#api_client ⇒ Object
Returns the value of attribute api_client.
12
13
14
|
# File 'lib/hutch/broker.rb', line 12
def api_client
@api_client
end
|
#channel ⇒ Object
Returns the value of attribute channel.
12
13
14
|
# File 'lib/hutch/broker.rb', line 12
def channel
@channel
end
|
#connection ⇒ Object
Returns the value of attribute connection.
12
13
14
|
# File 'lib/hutch/broker.rb', line 12
def connection
@connection
end
|
#exchange ⇒ Object
Returns the value of attribute exchange.
12
13
14
|
# File 'lib/hutch/broker.rb', line 12
def exchange
@exchange
end
|
Instance Method Details
#ack(delivery_tag) ⇒ Object
244
245
246
|
# File 'lib/hutch/broker.rb', line 244
def ack(delivery_tag)
channel.ack(delivery_tag, false)
end
|
#bind_queue(queue, routing_keys) ⇒ Object
Bind a queue to the broker’s exchange on the routing keys provided. Any existing bindings on the queue that aren’t present in the array of routing keys will be unbound.
213
214
215
216
217
218
219
220
221
|
# File 'lib/hutch/broker.rb', line 213
def bind_queue(queue, routing_keys)
unbind_redundant_bindings(queue, routing_keys)
routing_keys.each do |routing_key|
logger.debug "creating binding #{queue.name} <--> #{routing_key}"
queue.bind(exchange, routing_key: routing_key)
end
end
|
#bindings ⇒ Object
Return a mapping of queue names to the routing keys they’re bound to.
183
184
185
186
187
188
189
190
191
192
193
194
195
|
# File 'lib/hutch/broker.rb', line 183
def bindings
results = Hash.new { |hash, key| hash[key] = [] }
filtered = api_client.bindings.
reject { |b| b['destination'] == b['routing_key'] }.
select { |b| b['source'] == @config[:mq_exchange] && b['vhost'] == @config[:mq_vhost] }
filtered.each do |binding|
results[binding['destination']] << binding['routing_key']
end
results
end
|
#confirm_select(*args) ⇒ Object
256
257
258
|
# File 'lib/hutch/broker.rb', line 256
def confirm_select(*args)
channel.confirm_select(*args)
end
|
#connect(options = {}) ⇒ Object
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/hutch/broker.rb', line 46
def connect(options = {})
@options = options
set_up_amqp_connection
if http_api_use_enabled?
logger.info "HTTP API use is enabled"
set_up_api_connection
else
logger.info "HTTP API use is disabled"
end
if tracing_enabled?
logger.info "tracing is enabled using #{@config[:tracer]}"
else
logger.info "tracing is disabled"
end
if block_given?
begin
yield
ensure
disconnect
end
end
end
|
#declare_exchange(ch = channel) ⇒ Object
123
124
125
126
127
128
129
130
131
132
|
# File 'lib/hutch/broker.rb', line 123
def declare_exchange(ch = channel)
exchange_name = @config[:mq_exchange]
exchange_type = @config[:mq_exchange_type]
exchange_options = { durable: true }.merge(@config[:mq_exchange_options])
logger.info "using topic exchange '#{exchange_name}'"
with_bunny_precondition_handler('exchange') do
Adapter.new_exchange(ch, exchange_type, exchange_name, exchange_options)
end
end
|
#declare_exchange!(*args) ⇒ Object
134
135
136
|
# File 'lib/hutch/broker.rb', line 134
def declare_exchange!(*args)
@exchange = declare_exchange(*args)
end
|
#declare_publisher! ⇒ Object
138
139
140
|
# File 'lib/hutch/broker.rb', line 138
def declare_publisher!
@publisher = Hutch::Publisher.new(connection, channel, exchange, @config)
end
|
#disconnect ⇒ Object
71
72
73
74
75
76
77
78
|
# File 'lib/hutch/broker.rb', line 71
def disconnect
@channel.close if @channel
@connection.close if @connection
@channel = nil
@connection = nil
@exchange = nil
@api_client = nil
end
|
#http_api_use_enabled? ⇒ Boolean
158
159
160
161
162
163
164
165
166
167
|
# File 'lib/hutch/broker.rb', line 158
def http_api_use_enabled?
op = @options.fetch(:enable_http_api_use, true)
cf = if @config[:enable_http_api_use].nil?
true
else
@config[:enable_http_api_use]
end
op && cf
end
|
#nack(delivery_tag) ⇒ Object
248
249
250
|
# File 'lib/hutch/broker.rb', line 248
def nack(delivery_tag)
channel.nack(delivery_tag, false, false)
end
|
#open_channel ⇒ Object
108
109
110
111
112
113
114
115
116
117
|
# File 'lib/hutch/broker.rb', line 108
def open_channel
logger.info "opening rabbitmq channel with pool size #{consumer_pool_size}, abort on exception #{consumer_pool_abort_on_exception}"
connection.create_channel(nil, consumer_pool_size, consumer_pool_abort_on_exception).tap do |ch|
connection.prefetch_channel(ch, @config[:channel_prefetch])
if @config[:publisher_confirms] || @config[:force_publisher_confirms]
logger.info 'enabling publisher confirms'
ch.confirm_select
end
end
end
|
#open_channel! ⇒ Object
119
120
121
|
# File 'lib/hutch/broker.rb', line 119
def open_channel!
@channel = open_channel
end
|
#open_connection ⇒ Object
91
92
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/hutch/broker.rb', line 91
def open_connection
logger.info "connecting to rabbitmq (#{sanitized_uri})"
connection = Hutch::Adapter.new(connection_params)
with_bunny_connection_handler(sanitized_uri) do
connection.start
end
logger.info "connected to RabbitMQ at #{connection_params[:host]} as #{connection_params[:username]}"
connection
end
|
#open_connection! ⇒ Object
104
105
106
|
# File 'lib/hutch/broker.rb', line 104
def open_connection!
@connection = open_connection
end
|
#publish(*args) ⇒ Object
252
253
254
|
# File 'lib/hutch/broker.rb', line 252
def publish(*args)
@publisher.publish(*args)
end
|
#queue(name, options = {}) ⇒ Object
Create / get a durable queue and apply namespace if it exists.
174
175
176
177
178
179
180
|
# File 'lib/hutch/broker.rb', line 174
def queue(name, options = {})
with_bunny_precondition_handler('queue') do
namespace = @config[:namespace].to_s.downcase.gsub(/[^-_:\.\w]/, "")
queue_name = namespace.present? ? "#{namespace}:#{name}" : name
channel.queue(queue_name, **options)
end
end
|
#reject(delivery_tag, requeue = false) ⇒ Object
240
241
242
|
# File 'lib/hutch/broker.rb', line 240
def reject(delivery_tag, requeue=false)
channel.reject(delivery_tag, requeue)
end
|
#requeue(delivery_tag) ⇒ Object
236
237
238
|
# File 'lib/hutch/broker.rb', line 236
def requeue(delivery_tag)
channel.reject(delivery_tag, true)
end
|
#set_up_amqp_connection ⇒ Object
Connect to RabbitMQ via AMQP
This sets up the main connection and channel we use for talking to RabbitMQ. It also ensures the existence of the exchange we’ll be using.
84
85
86
87
88
89
|
# File 'lib/hutch/broker.rb', line 84
def set_up_amqp_connection
open_connection!
open_channel!
declare_exchange!
declare_publisher!
end
|
#set_up_api_connection ⇒ Object
Set up the connection to the RabbitMQ management API. Unfortunately, this is necessary to do a few things that are impossible over AMQP. E.g. listing queues and bindings.
145
146
147
148
149
150
151
152
153
154
155
156
|
# File 'lib/hutch/broker.rb', line 145
def set_up_api_connection
logger.info "connecting to rabbitmq HTTP API (#{api_config.sanitized_uri})"
with_authentication_error_handler do
with_connection_error_handler do
@api_client = CarrotTop.new(host: api_config.host, port: api_config.port,
user: api_config.username, password: api_config.password,
ssl: api_config.ssl)
@api_client.exchanges
end
end
end
|
#stop ⇒ Object
223
224
225
226
227
228
229
230
231
232
233
234
|
# File 'lib/hutch/broker.rb', line 223
def stop
if defined?(JRUBY_VERSION)
channel.close
else
channel_work_pool.shutdown
channel_work_pool.join(@config[:graceful_exit_timeout])
channel_work_pool.kill
end
end
|
#tracing_enabled? ⇒ Boolean
169
170
171
|
# File 'lib/hutch/broker.rb', line 169
def tracing_enabled?
@config[:tracer] && @config[:tracer] != Hutch::Tracers::NullTracer
end
|
#unbind_redundant_bindings(queue, routing_keys) ⇒ Object
Find the existing bindings, and unbind any redundant bindings
198
199
200
201
202
203
204
205
206
207
208
|
# File 'lib/hutch/broker.rb', line 198
def unbind_redundant_bindings(queue, routing_keys)
return unless http_api_use_enabled?
filtered = bindings.select { |dest, keys| dest == queue.name }
filtered.each do |dest, keys|
keys.reject { |key| routing_keys.include?(key) }.each do |key|
logger.debug "removing redundant binding #{queue.name} <--> #{key}"
queue.unbind(exchange, routing_key: key)
end
end
end
|
#using_publisher_confirmations? ⇒ Boolean
Returns True if channel is set up to use publisher confirmations.
265
266
267
|
# File 'lib/hutch/broker.rb', line 265
def using_publisher_confirmations?
channel.using_publisher_confirmations?
end
|
#wait_for_confirms ⇒ Object
260
261
262
|
# File 'lib/hutch/broker.rb', line 260
def wait_for_confirms
channel.wait_for_confirms
end
|