Class: Hutch::Broker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/hutch/broker.rb

Constant Summary collapse

DEFAULT_AMQP_PORT =
case RUBY_ENGINE
when "jruby" then
  com.rabbitmq.client.ConnectionFactory::DEFAULT_AMQP_PORT
when "ruby" then
  AMQ::Protocol::DEFAULT_PORT
end
DEFAULT_AMQPS_PORT =
case RUBY_ENGINE
when "jruby" then
  com.rabbitmq.client.ConnectionFactory::DEFAULT_AMQP_OVER_SSL_PORT
when "ruby" then
  AMQ::Protocol::TLS_PORT
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#logger, logger, logger=, setup_logger

Constructor Details

#initialize(config = nil) ⇒ Broker

Returns a new instance of Broker.

Parameters:

  • config (nil, Hash) (defaults to: nil)

    Configuration override



34
35
36
# File 'lib/hutch/broker.rb', line 34

def initialize(config = nil)
  @config = config || Hutch::Config
end

Instance Attribute Details

#api_clientObject

Returns the value of attribute api_client.



13
14
15
# File 'lib/hutch/broker.rb', line 13

def api_client
  @api_client
end

#channelObject

Returns the value of attribute channel.



13
14
15
# File 'lib/hutch/broker.rb', line 13

def channel
  @channel
end

#connectionObject

Returns the value of attribute connection.



13
14
15
# File 'lib/hutch/broker.rb', line 13

def connection
  @connection
end

#exchangeObject

Returns the value of attribute exchange.



13
14
15
# File 'lib/hutch/broker.rb', line 13

def exchange
  @exchange
end

Instance Method Details

#ack(delivery_tag) ⇒ Object



245
246
247
# File 'lib/hutch/broker.rb', line 245

def ack(delivery_tag)
  channel.ack(delivery_tag, false)
end

#bind_queue(queue, routing_keys) ⇒ Object

Bind a queue to the broker’s exchange on the routing keys provided. Any existing bindings on the queue that aren’t present in the array of routing keys will be unbound.



214
215
216
217
218
219
220
221
222
# File 'lib/hutch/broker.rb', line 214

def bind_queue(queue, routing_keys)
  unbind_redundant_bindings(queue, routing_keys)

  # Ensure all the desired bindings are present
  routing_keys.each do |routing_key|
    logger.debug "creating binding #{queue.name} <--> #{routing_key}"
    queue.bind(exchange, routing_key: routing_key)
  end
end

#bindingsObject

Return a mapping of queue names to the routing keys they’re bound to.



184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/hutch/broker.rb', line 184

def bindings
  results = Hash.new { |hash, key| hash[key] = [] }

  filtered = api_client.bindings.
    reject { |b| b['destination'] == b['routing_key'] }.
    select { |b| b['source'] == @config[:mq_exchange] && b['vhost'] == @config[:mq_vhost] }

  filtered.each do |binding|
    results[binding['destination']] << binding['routing_key']
  end

  results
end

#confirm_select(*args) ⇒ Object



257
258
259
# File 'lib/hutch/broker.rb', line 257

def confirm_select(*args)
  channel.confirm_select(*args)
end

#connect(options = {}) ⇒ Object

Connect to broker

Examples:

Hutch::Broker.new.connect(enable_http_api_use: true) do
  # will disconnect after this block
end

Parameters:

  • options (Hash) (defaults to: {})

    The options to connect with

Options Hash (options):

  • :enable_http_api_use (Boolean)


47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/hutch/broker.rb', line 47

def connect(options = {})
  @options = options
  set_up_amqp_connection
  if http_api_use_enabled?
    logger.info "HTTP API use is enabled"
    set_up_api_connection
  else
    logger.info "HTTP API use is disabled"
  end

  if tracing_enabled?
    logger.info "tracing is enabled using #{@config[:tracer]}"
  else
    logger.info "tracing is disabled"
  end

  if block_given?
    begin
      yield
    ensure
      disconnect
    end
  end
end

#declare_exchange(ch = channel) ⇒ Object



124
125
126
127
128
129
130
131
132
133
# File 'lib/hutch/broker.rb', line 124

def declare_exchange(ch = channel)
  exchange_name = @config[:mq_exchange]
  exchange_type = @config[:mq_exchange_type]
  exchange_options = { durable: true }.merge(@config[:mq_exchange_options])
  logger.info "using topic exchange '#{exchange_name}'"

  with_bunny_precondition_handler('exchange') do
    Adapter.new_exchange(ch, exchange_type, exchange_name, exchange_options)
  end
end

#declare_exchange!(*args) ⇒ Object



135
136
137
# File 'lib/hutch/broker.rb', line 135

def declare_exchange!(*args)
  @exchange = declare_exchange(*args)
end

#declare_publisher!Object



139
140
141
# File 'lib/hutch/broker.rb', line 139

def declare_publisher!
  @publisher = Hutch::Publisher.new(connection, channel, exchange, @config)
end

#disconnectObject



72
73
74
75
76
77
78
79
# File 'lib/hutch/broker.rb', line 72

def disconnect
  @channel.close    if @channel
  @connection.close if @connection
  @channel = nil
  @connection = nil
  @exchange = nil
  @api_client = nil
end

#http_api_use_enabled?Boolean

Returns:

  • (Boolean)


159
160
161
162
163
164
165
166
167
168
# File 'lib/hutch/broker.rb', line 159

def http_api_use_enabled?
  op = @options.fetch(:enable_http_api_use, true)
  cf = if @config[:enable_http_api_use].nil?
         true
       else
         @config[:enable_http_api_use]
       end

  op && cf
end

#nack(delivery_tag) ⇒ Object



249
250
251
# File 'lib/hutch/broker.rb', line 249

def nack(delivery_tag)
  channel.nack(delivery_tag, false, false)
end

#open_channelObject



109
110
111
112
113
114
115
116
117
118
# File 'lib/hutch/broker.rb', line 109

def open_channel
  logger.info "opening rabbitmq channel with pool size #{consumer_pool_size}, abort on exception #{consumer_pool_abort_on_exception}"
  connection.create_channel(nil, consumer_pool_size, consumer_pool_abort_on_exception).tap do |ch|
    connection.prefetch_channel(ch, @config[:channel_prefetch])
    if @config[:publisher_confirms] || @config[:force_publisher_confirms]
      logger.info 'enabling publisher confirms'
      ch.confirm_select
    end
  end
end

#open_channel!Object



120
121
122
# File 'lib/hutch/broker.rb', line 120

def open_channel!
  @channel = open_channel
end

#open_connectionObject



92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/hutch/broker.rb', line 92

def open_connection
  logger.info "connecting to rabbitmq (#{sanitized_uri})"

  connection = Hutch::Adapter.new(connection_params)

  with_bunny_connection_handler(sanitized_uri) do
    connection.start
  end

  logger.info "connected to RabbitMQ at #{connection_params[:host]} as #{connection_params[:username]}"
  connection
end

#open_connection!Object



105
106
107
# File 'lib/hutch/broker.rb', line 105

def open_connection!
  @connection = open_connection
end

#publish(*args) ⇒ Object



253
254
255
# File 'lib/hutch/broker.rb', line 253

def publish(*args)
  @publisher.publish(*args)
end

#queue(name, options = {}) ⇒ Object

Create / get a durable queue and apply namespace if it exists.



175
176
177
178
179
180
181
# File 'lib/hutch/broker.rb', line 175

def queue(name, options = {})
  with_bunny_precondition_handler('queue') do
    namespace = @config[:namespace].to_s.downcase.gsub(/[^-_:\.\w]/, "")
    queue_name = namespace.present? ? "#{namespace}:#{name}" : name
    channel.queue(queue_name, **options)
  end
end

#reject(delivery_tag, requeue = false) ⇒ Object



241
242
243
# File 'lib/hutch/broker.rb', line 241

def reject(delivery_tag, requeue=false)
  channel.reject(delivery_tag, requeue)
end

#requeue(delivery_tag) ⇒ Object



237
238
239
# File 'lib/hutch/broker.rb', line 237

def requeue(delivery_tag)
  channel.reject(delivery_tag, true)
end

#set_up_amqp_connectionObject

Connect to RabbitMQ via AMQP

This sets up the main connection and channel we use for talking to RabbitMQ. It also ensures the existence of the exchange we’ll be using.



85
86
87
88
89
90
# File 'lib/hutch/broker.rb', line 85

def set_up_amqp_connection
  open_connection!
  open_channel!
  declare_exchange!
  declare_publisher!
end

#set_up_api_connectionObject

Set up the connection to the RabbitMQ management API. Unfortunately, this is necessary to do a few things that are impossible over AMQP. E.g. listing queues and bindings.



146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/hutch/broker.rb', line 146

def set_up_api_connection
  logger.info "connecting to rabbitmq HTTP API (#{api_config.sanitized_uri})"

  with_authentication_error_handler do
    with_connection_error_handler do
      @api_client = CarrotTop.new(host: api_config.host, port: api_config.port,
                                  user: api_config.username, password: api_config.password,
                                  ssl: api_config.ssl)
      @api_client.exchanges
    end
  end
end

#stopObject



224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/hutch/broker.rb', line 224

def stop
  if defined?(JRUBY_VERSION)
    channel.close
  else
    # Enqueue a failing job that kills the consumer loop
    channel_work_pool.shutdown
    # Give `timeout` seconds to jobs that are still being processed
    channel_work_pool.join(@config[:graceful_exit_timeout])
    # If after `timeout` they are still running, they are killed
    channel_work_pool.kill
  end
end

#tracing_enabled?Boolean

Returns:

  • (Boolean)


170
171
172
# File 'lib/hutch/broker.rb', line 170

def tracing_enabled?
  @config[:tracer] && @config[:tracer] != Hutch::Tracers::NullTracer
end

#unbind_redundant_bindings(queue, routing_keys) ⇒ Object

Find the existing bindings, and unbind any redundant bindings



199
200
201
202
203
204
205
206
207
208
209
# File 'lib/hutch/broker.rb', line 199

def unbind_redundant_bindings(queue, routing_keys)
  return unless http_api_use_enabled?

  filtered = bindings.select { |dest, keys| dest == queue.name }
  filtered.each do |dest, keys|
    keys.reject { |key| routing_keys.include?(key) }.each do |key|
      logger.debug "removing redundant binding #{queue.name} <--> #{key}"
      queue.unbind(exchange, routing_key: key)
    end
  end
end

#using_publisher_confirmations?Boolean

Returns True if channel is set up to use publisher confirmations.

Returns:

  • (Boolean)

    True if channel is set up to use publisher confirmations.



266
267
268
# File 'lib/hutch/broker.rb', line 266

def using_publisher_confirmations?
  channel.using_publisher_confirmations?
end

#wait_for_confirmsObject



261
262
263
# File 'lib/hutch/broker.rb', line 261

def wait_for_confirms
  channel.wait_for_confirms
end