Class: Hutch::Broker
- Inherits:
-
Object
show all
- Includes:
- Logging
- Defined in:
- lib/hutch/broker.rb
Constant Summary
collapse
- DEFAULT_AMQP_PORT =
case RUBY_ENGINE
when "jruby" then
com.rabbitmq.client.ConnectionFactory::DEFAULT_AMQP_PORT
when "ruby" then
AMQ::Protocol::DEFAULT_PORT
end
- DEFAULT_AMQPS_PORT =
case RUBY_ENGINE
when "jruby" then
com.rabbitmq.client.ConnectionFactory::DEFAULT_AMQP_OVER_SSL_PORT
when "ruby" then
AMQ::Protocol::TLS_PORT
end
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Logging
#logger, logger, logger=, setup_logger
Constructor Details
#initialize(config = nil) ⇒ Broker
Returns a new instance of Broker.
34
35
36
|
# File 'lib/hutch/broker.rb', line 34
def initialize(config = nil)
@config = config || Hutch::Config
end
|
Instance Attribute Details
#api_client ⇒ Object
Returns the value of attribute api_client.
13
14
15
|
# File 'lib/hutch/broker.rb', line 13
def api_client
@api_client
end
|
#channel ⇒ Object
Returns the value of attribute channel.
13
14
15
|
# File 'lib/hutch/broker.rb', line 13
def channel
@channel
end
|
#connection ⇒ Object
Returns the value of attribute connection.
13
14
15
|
# File 'lib/hutch/broker.rb', line 13
def connection
@connection
end
|
#exchange ⇒ Object
Returns the value of attribute exchange.
13
14
15
|
# File 'lib/hutch/broker.rb', line 13
def exchange
@exchange
end
|
Instance Method Details
#ack(delivery_tag) ⇒ Object
245
246
247
|
# File 'lib/hutch/broker.rb', line 245
def ack(delivery_tag)
channel.ack(delivery_tag, false)
end
|
#bind_queue(queue, routing_keys) ⇒ Object
Bind a queue to the broker’s exchange on the routing keys provided. Any existing bindings on the queue that aren’t present in the array of routing keys will be unbound.
214
215
216
217
218
219
220
221
222
|
# File 'lib/hutch/broker.rb', line 214
def bind_queue(queue, routing_keys)
unbind_redundant_bindings(queue, routing_keys)
routing_keys.each do |routing_key|
logger.debug "creating binding #{queue.name} <--> #{routing_key}"
queue.bind(exchange, routing_key: routing_key)
end
end
|
#bindings ⇒ Object
Return a mapping of queue names to the routing keys they’re bound to.
184
185
186
187
188
189
190
191
192
193
194
195
196
|
# File 'lib/hutch/broker.rb', line 184
def bindings
results = Hash.new { |hash, key| hash[key] = [] }
filtered = api_client.bindings.
reject { |b| b['destination'] == b['routing_key'] }.
select { |b| b['source'] == @config[:mq_exchange] && b['vhost'] == @config[:mq_vhost] }
filtered.each do |binding|
results[binding['destination']] << binding['routing_key']
end
results
end
|
#confirm_select(*args) ⇒ Object
257
258
259
|
# File 'lib/hutch/broker.rb', line 257
def confirm_select(*args)
channel.confirm_select(*args)
end
|
#connect(options = {}) ⇒ Object
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
# File 'lib/hutch/broker.rb', line 47
def connect(options = {})
@options = options
set_up_amqp_connection
if http_api_use_enabled?
logger.info "HTTP API use is enabled"
set_up_api_connection
else
logger.info "HTTP API use is disabled"
end
if tracing_enabled?
logger.info "tracing is enabled using #{@config[:tracer]}"
else
logger.info "tracing is disabled"
end
if block_given?
begin
yield
ensure
disconnect
end
end
end
|
#declare_exchange(ch = channel) ⇒ Object
124
125
126
127
128
129
130
131
132
133
|
# File 'lib/hutch/broker.rb', line 124
def declare_exchange(ch = channel)
exchange_name = @config[:mq_exchange]
exchange_type = @config[:mq_exchange_type]
exchange_options = { durable: true }.merge(@config[:mq_exchange_options])
logger.info "using topic exchange '#{exchange_name}'"
with_bunny_precondition_handler('exchange') do
Adapter.new_exchange(ch, exchange_type, exchange_name, exchange_options)
end
end
|
#declare_exchange!(*args) ⇒ Object
135
136
137
|
# File 'lib/hutch/broker.rb', line 135
def declare_exchange!(*args)
@exchange = declare_exchange(*args)
end
|
#declare_publisher! ⇒ Object
139
140
141
|
# File 'lib/hutch/broker.rb', line 139
def declare_publisher!
@publisher = Hutch::Publisher.new(connection, channel, exchange, @config)
end
|
#disconnect ⇒ Object
72
73
74
75
76
77
78
79
|
# File 'lib/hutch/broker.rb', line 72
def disconnect
@channel.close if @channel
@connection.close if @connection
@channel = nil
@connection = nil
@exchange = nil
@api_client = nil
end
|
#http_api_use_enabled? ⇒ Boolean
159
160
161
162
163
164
165
166
167
168
|
# File 'lib/hutch/broker.rb', line 159
def http_api_use_enabled?
op = @options.fetch(:enable_http_api_use, true)
cf = if @config[:enable_http_api_use].nil?
true
else
@config[:enable_http_api_use]
end
op && cf
end
|
#nack(delivery_tag) ⇒ Object
249
250
251
|
# File 'lib/hutch/broker.rb', line 249
def nack(delivery_tag)
channel.nack(delivery_tag, false, false)
end
|
#open_channel ⇒ Object
109
110
111
112
113
114
115
116
117
118
|
# File 'lib/hutch/broker.rb', line 109
def open_channel
logger.info "opening rabbitmq channel with pool size #{consumer_pool_size}, abort on exception #{consumer_pool_abort_on_exception}"
connection.create_channel(nil, consumer_pool_size, consumer_pool_abort_on_exception).tap do |ch|
connection.prefetch_channel(ch, @config[:channel_prefetch])
if @config[:publisher_confirms] || @config[:force_publisher_confirms]
logger.info 'enabling publisher confirms'
ch.confirm_select
end
end
end
|
#open_channel! ⇒ Object
120
121
122
|
# File 'lib/hutch/broker.rb', line 120
def open_channel!
@channel = open_channel
end
|
#open_connection ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
103
|
# File 'lib/hutch/broker.rb', line 92
def open_connection
logger.info "connecting to rabbitmq (#{sanitized_uri})"
connection = Hutch::Adapter.new(connection_params)
with_bunny_connection_handler(sanitized_uri) do
connection.start
end
logger.info "connected to RabbitMQ at #{connection_params[:host]} as #{connection_params[:username]}"
connection
end
|
#open_connection! ⇒ Object
105
106
107
|
# File 'lib/hutch/broker.rb', line 105
def open_connection!
@connection = open_connection
end
|
#publish(*args) ⇒ Object
253
254
255
|
# File 'lib/hutch/broker.rb', line 253
def publish(*args)
@publisher.publish(*args)
end
|
#queue(name, options = {}) ⇒ Object
Create / get a durable queue and apply namespace if it exists.
175
176
177
178
179
180
181
|
# File 'lib/hutch/broker.rb', line 175
def queue(name, options = {})
with_bunny_precondition_handler('queue') do
namespace = @config[:namespace].to_s.downcase.gsub(/[^-_:\.\w]/, "")
queue_name = namespace.present? ? "#{namespace}:#{name}" : name
channel.queue(queue_name, **options)
end
end
|
#reject(delivery_tag, requeue = false) ⇒ Object
241
242
243
|
# File 'lib/hutch/broker.rb', line 241
def reject(delivery_tag, requeue=false)
channel.reject(delivery_tag, requeue)
end
|
#requeue(delivery_tag) ⇒ Object
237
238
239
|
# File 'lib/hutch/broker.rb', line 237
def requeue(delivery_tag)
channel.reject(delivery_tag, true)
end
|
#set_up_amqp_connection ⇒ Object
Connect to RabbitMQ via AMQP
This sets up the main connection and channel we use for talking to RabbitMQ. It also ensures the existence of the exchange we’ll be using.
85
86
87
88
89
90
|
# File 'lib/hutch/broker.rb', line 85
def set_up_amqp_connection
open_connection!
open_channel!
declare_exchange!
declare_publisher!
end
|
#set_up_api_connection ⇒ Object
Set up the connection to the RabbitMQ management API. Unfortunately, this is necessary to do a few things that are impossible over AMQP. E.g. listing queues and bindings.
146
147
148
149
150
151
152
153
154
155
156
157
|
# File 'lib/hutch/broker.rb', line 146
def set_up_api_connection
logger.info "connecting to rabbitmq HTTP API (#{api_config.sanitized_uri})"
with_authentication_error_handler do
with_connection_error_handler do
@api_client = CarrotTop.new(host: api_config.host, port: api_config.port,
user: api_config.username, password: api_config.password,
ssl: api_config.ssl)
@api_client.exchanges
end
end
end
|
#stop ⇒ Object
224
225
226
227
228
229
230
231
232
233
234
235
|
# File 'lib/hutch/broker.rb', line 224
def stop
if defined?(JRUBY_VERSION)
channel.close
else
channel_work_pool.shutdown
channel_work_pool.join(@config[:graceful_exit_timeout])
channel_work_pool.kill
end
end
|
#tracing_enabled? ⇒ Boolean
170
171
172
|
# File 'lib/hutch/broker.rb', line 170
def tracing_enabled?
@config[:tracer] && @config[:tracer] != Hutch::Tracers::NullTracer
end
|
#unbind_redundant_bindings(queue, routing_keys) ⇒ Object
Find the existing bindings, and unbind any redundant bindings
199
200
201
202
203
204
205
206
207
208
209
|
# File 'lib/hutch/broker.rb', line 199
def unbind_redundant_bindings(queue, routing_keys)
return unless http_api_use_enabled?
filtered = bindings.select { |dest, keys| dest == queue.name }
filtered.each do |dest, keys|
keys.reject { |key| routing_keys.include?(key) }.each do |key|
logger.debug "removing redundant binding #{queue.name} <--> #{key}"
queue.unbind(exchange, routing_key: key)
end
end
end
|
#using_publisher_confirmations? ⇒ Boolean
Returns True if channel is set up to use publisher confirmations.
266
267
268
|
# File 'lib/hutch/broker.rb', line 266
def using_publisher_confirmations?
channel.using_publisher_confirmations?
end
|
#wait_for_confirms ⇒ Object
261
262
263
|
# File 'lib/hutch/broker.rb', line 261
def wait_for_confirms
channel.wait_for_confirms
end
|