Class: Hutch::Broker

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/hutch/broker.rb

Constant Summary collapse

DEFAULT_AMQP_PORT =
case RUBY_ENGINE
when "jruby" then
  com.rabbitmq.client.ConnectionFactory::DEFAULT_AMQP_PORT
when "ruby" then
  AMQ::Protocol::DEFAULT_PORT
end
DEFAULT_AMQPS_PORT =
case RUBY_ENGINE
when "jruby" then
  com.rabbitmq.client.ConnectionFactory::DEFAULT_AMQP_OVER_SSL_PORT
when "ruby" then
  AMQ::Protocol::TLS_PORT
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#logger, logger, logger=, setup_logger

Constructor Details

#initialize(config = nil) ⇒ Broker

Returns a new instance of Broker.

Parameters:

  • config (nil, Hash) (defaults to: nil)

    Configuration override



33
34
35
# File 'lib/hutch/broker.rb', line 33

def initialize(config = nil)
  @config = config || Hutch::Config
end

Instance Attribute Details

#api_clientObject

Returns the value of attribute api_client.



12
13
14
# File 'lib/hutch/broker.rb', line 12

def api_client
  @api_client
end

#channelObject

Returns the value of attribute channel.



12
13
14
# File 'lib/hutch/broker.rb', line 12

def channel
  @channel
end

#connectionObject

Returns the value of attribute connection.



12
13
14
# File 'lib/hutch/broker.rb', line 12

def connection
  @connection
end

#exchangeObject

Returns the value of attribute exchange.



12
13
14
# File 'lib/hutch/broker.rb', line 12

def exchange
  @exchange
end

Instance Method Details

#ack(delivery_tag) ⇒ Object



244
245
246
# File 'lib/hutch/broker.rb', line 244

def ack(delivery_tag)
  channel.ack(delivery_tag, false)
end

#bind_queue(queue, routing_keys) ⇒ Object

Bind a queue to the broker’s exchange on the routing keys provided. Any existing bindings on the queue that aren’t present in the array of routing keys will be unbound.



213
214
215
216
217
218
219
220
221
# File 'lib/hutch/broker.rb', line 213

def bind_queue(queue, routing_keys)
  unbind_redundant_bindings(queue, routing_keys)

  # Ensure all the desired bindings are present
  routing_keys.each do |routing_key|
    logger.debug "creating binding #{queue.name} <--> #{routing_key}"
    queue.bind(exchange, routing_key: routing_key)
  end
end

#bindingsObject

Return a mapping of queue names to the routing keys they’re bound to.



183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/hutch/broker.rb', line 183

def bindings
  results = Hash.new { |hash, key| hash[key] = [] }

  filtered = api_client.bindings.
    reject { |b| b['destination'] == b['routing_key'] }.
    select { |b| b['source'] == @config[:mq_exchange] && b['vhost'] == @config[:mq_vhost] }

  filtered.each do |binding|
    results[binding['destination']] << binding['routing_key']
  end

  results
end

#confirm_select(*args) ⇒ Object



256
257
258
# File 'lib/hutch/broker.rb', line 256

def confirm_select(*args)
  channel.confirm_select(*args)
end

#connect(options = {}) ⇒ Object

Connect to broker

Examples:

Hutch::Broker.new.connect(enable_http_api_use: true) do
  # will disconnect after this block
end

Parameters:

  • options (Hash) (defaults to: {})

    The options to connect with

Options Hash (options):

  • :enable_http_api_use (Boolean)


46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/hutch/broker.rb', line 46

def connect(options = {})
  @options = options
  set_up_amqp_connection
  if http_api_use_enabled?
    logger.info "HTTP API use is enabled"
    set_up_api_connection
  else
    logger.info "HTTP API use is disabled"
  end

  if tracing_enabled?
    logger.info "tracing is enabled using #{@config[:tracer]}"
  else
    logger.info "tracing is disabled"
  end

  if block_given?
    begin
      yield
    ensure
      disconnect
    end
  end
end

#declare_exchange(ch = channel) ⇒ Object



123
124
125
126
127
128
129
130
131
132
# File 'lib/hutch/broker.rb', line 123

def declare_exchange(ch = channel)
  exchange_name = @config[:mq_exchange]
  exchange_type = @config[:mq_exchange_type]
  exchange_options = { durable: true }.merge(@config[:mq_exchange_options])
  logger.info "using topic exchange '#{exchange_name}'"

  with_bunny_precondition_handler('exchange') do
    Adapter.new_exchange(ch, exchange_type, exchange_name, exchange_options)
  end
end

#declare_exchange!(*args) ⇒ Object



134
135
136
# File 'lib/hutch/broker.rb', line 134

def declare_exchange!(*args)
  @exchange = declare_exchange(*args)
end

#declare_publisher!Object



138
139
140
# File 'lib/hutch/broker.rb', line 138

def declare_publisher!
  @publisher = Hutch::Publisher.new(connection, channel, exchange, @config)
end

#disconnectObject



71
72
73
74
75
76
77
78
# File 'lib/hutch/broker.rb', line 71

def disconnect
  @channel.close    if @channel
  @connection.close if @connection
  @channel = nil
  @connection = nil
  @exchange = nil
  @api_client = nil
end

#http_api_use_enabled?Boolean

Returns:

  • (Boolean)


158
159
160
161
162
163
164
165
166
167
# File 'lib/hutch/broker.rb', line 158

def http_api_use_enabled?
  op = @options.fetch(:enable_http_api_use, true)
  cf = if @config[:enable_http_api_use].nil?
         true
       else
         @config[:enable_http_api_use]
       end

  op && cf
end

#nack(delivery_tag) ⇒ Object



248
249
250
# File 'lib/hutch/broker.rb', line 248

def nack(delivery_tag)
  channel.nack(delivery_tag, false, false)
end

#open_channelObject



108
109
110
111
112
113
114
115
116
117
# File 'lib/hutch/broker.rb', line 108

def open_channel
  logger.info "opening rabbitmq channel with pool size #{consumer_pool_size}, abort on exception #{consumer_pool_abort_on_exception}"
  connection.create_channel(nil, consumer_pool_size, consumer_pool_abort_on_exception).tap do |ch|
    connection.prefetch_channel(ch, @config[:channel_prefetch])
    if @config[:publisher_confirms] || @config[:force_publisher_confirms]
      logger.info 'enabling publisher confirms'
      ch.confirm_select
    end
  end
end

#open_channel!Object



119
120
121
# File 'lib/hutch/broker.rb', line 119

def open_channel!
  @channel = open_channel
end

#open_connectionObject



91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/hutch/broker.rb', line 91

def open_connection
  logger.info "connecting to rabbitmq (#{sanitized_uri})"

  connection = Hutch::Adapter.new(connection_params)

  with_bunny_connection_handler(sanitized_uri) do
    connection.start
  end

  logger.info "connected to RabbitMQ at #{connection_params[:host]} as #{connection_params[:username]}"
  connection
end

#open_connection!Object



104
105
106
# File 'lib/hutch/broker.rb', line 104

def open_connection!
  @connection = open_connection
end

#publish(*args) ⇒ Object



252
253
254
# File 'lib/hutch/broker.rb', line 252

def publish(*args)
  @publisher.publish(*args)
end

#queue(name, options = {}) ⇒ Object

Create / get a durable queue and apply namespace if it exists.



174
175
176
177
178
179
180
# File 'lib/hutch/broker.rb', line 174

def queue(name, options = {})
  with_bunny_precondition_handler('queue') do
    namespace = @config[:namespace].to_s.downcase.gsub(/[^-_:\.\w]/, "")
    queue_name = namespace.present? ? "#{namespace}:#{name}" : name
    channel.queue(queue_name, **options)
  end
end

#reject(delivery_tag, requeue = false) ⇒ Object



240
241
242
# File 'lib/hutch/broker.rb', line 240

def reject(delivery_tag, requeue=false)
  channel.reject(delivery_tag, requeue)
end

#requeue(delivery_tag) ⇒ Object



236
237
238
# File 'lib/hutch/broker.rb', line 236

def requeue(delivery_tag)
  channel.reject(delivery_tag, true)
end

#set_up_amqp_connectionObject

Connect to RabbitMQ via AMQP

This sets up the main connection and channel we use for talking to RabbitMQ. It also ensures the existence of the exchange we’ll be using.



84
85
86
87
88
89
# File 'lib/hutch/broker.rb', line 84

def set_up_amqp_connection
  open_connection!
  open_channel!
  declare_exchange!
  declare_publisher!
end

#set_up_api_connectionObject

Set up the connection to the RabbitMQ management API. Unfortunately, this is necessary to do a few things that are impossible over AMQP. E.g. listing queues and bindings.



145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/hutch/broker.rb', line 145

def set_up_api_connection
  logger.info "connecting to rabbitmq HTTP API (#{api_config.sanitized_uri})"

  with_authentication_error_handler do
    with_connection_error_handler do
      @api_client = CarrotTop.new(host: api_config.host, port: api_config.port,
                                  user: api_config.username, password: api_config.password,
                                  ssl: api_config.ssl)
      @api_client.exchanges
    end
  end
end

#stopObject



223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/hutch/broker.rb', line 223

def stop
  if defined?(JRUBY_VERSION)
    channel.close
  else
    # Enqueue a failing job that kills the consumer loop
    channel_work_pool.shutdown
    # Give `timeout` seconds to jobs that are still being processed
    channel_work_pool.join(@config[:graceful_exit_timeout])
    # If after `timeout` they are still running, they are killed
    channel_work_pool.kill
  end
end

#tracing_enabled?Boolean

Returns:

  • (Boolean)


169
170
171
# File 'lib/hutch/broker.rb', line 169

def tracing_enabled?
  @config[:tracer] && @config[:tracer] != Hutch::Tracers::NullTracer
end

#unbind_redundant_bindings(queue, routing_keys) ⇒ Object

Find the existing bindings, and unbind any redundant bindings



198
199
200
201
202
203
204
205
206
207
208
# File 'lib/hutch/broker.rb', line 198

def unbind_redundant_bindings(queue, routing_keys)
  return unless http_api_use_enabled?

  filtered = bindings.select { |dest, keys| dest == queue.name }
  filtered.each do |dest, keys|
    keys.reject { |key| routing_keys.include?(key) }.each do |key|
      logger.debug "removing redundant binding #{queue.name} <--> #{key}"
      queue.unbind(exchange, routing_key: key)
    end
  end
end

#using_publisher_confirmations?Boolean

Returns True if channel is set up to use publisher confirmations.

Returns:

  • (Boolean)

    True if channel is set up to use publisher confirmations.



265
266
267
# File 'lib/hutch/broker.rb', line 265

def using_publisher_confirmations?
  channel.using_publisher_confirmations?
end

#wait_for_confirmsObject



260
261
262
# File 'lib/hutch/broker.rb', line 260

def wait_for_confirms
  channel.wait_for_confirms
end