Class: Hutch::Broker
Instance Attribute Summary collapse
-
#api_client ⇒ Object
Returns the value of attribute api_client.
-
#channel ⇒ Object
Returns the value of attribute channel.
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#exchange ⇒ Object
Returns the value of attribute exchange.
Instance Method Summary collapse
- #ack(delivery_tag) ⇒ Object
-
#bind_queue(queue, routing_keys) ⇒ Object
Bind a queue to the broker’s exchange on the routing keys provided.
-
#bindings ⇒ Object
Return a mapping of queue names to the routing keys they’re bound to.
- #connect(options = {}) ⇒ Object
- #disconnect ⇒ Object
-
#initialize(config = nil) ⇒ Broker
constructor
A new instance of Broker.
- #nack(delivery_tag) ⇒ Object
- #open_channel! ⇒ Object
- #open_connection! ⇒ Object
- #publish(routing_key, message, properties = {}) ⇒ Object
-
#queue(name) ⇒ Object
Create / get a durable queue and apply namespace if it exists.
-
#set_up_amqp_connection ⇒ Object
Connect to RabbitMQ via AMQP.
-
#set_up_api_connection ⇒ Object
Set up the connection to the RabbitMQ management API.
- #stop ⇒ Object
-
#wait_on_threads(timeout) ⇒ Object
Each subscriber is run in a thread.
Methods included from Logging
#logger, logger, logger=, setup_logger
Constructor Details
Instance Attribute Details
#api_client ⇒ Object
Returns the value of attribute api_client.
11 12 13 |
# File 'lib/hutch/broker.rb', line 11 def api_client @api_client end |
#channel ⇒ Object
Returns the value of attribute channel.
11 12 13 |
# File 'lib/hutch/broker.rb', line 11 def channel @channel end |
#connection ⇒ Object
Returns the value of attribute connection.
11 12 13 |
# File 'lib/hutch/broker.rb', line 11 def connection @connection end |
#exchange ⇒ Object
Returns the value of attribute exchange.
11 12 13 |
# File 'lib/hutch/broker.rb', line 11 def exchange @exchange end |
Instance Method Details
#ack(delivery_tag) ⇒ Object
151 152 153 |
# File 'lib/hutch/broker.rb', line 151 def ack(delivery_tag) @channel.ack(delivery_tag, false) end |
#bind_queue(queue, routing_keys) ⇒ Object
Bind a queue to the broker’s exchange on the routing keys provided. Any existing bindings on the queue that aren’t present in the array of routing keys will be unbound.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/hutch/broker.rb', line 121 def bind_queue(queue, routing_keys) # Find the existing bindings, and unbind any redundant bindings queue_bindings = bindings.select { |dest, keys| dest == queue.name } queue_bindings.each do |dest, keys| keys.reject { |key| routing_keys.include?(key) }.each do |key| logger.debug "removing redundant binding #{queue.name} <--> #{key}" queue.unbind(@exchange, routing_key: key) end end # Ensure all the desired bindings are present routing_keys.each do |routing_key| logger.debug "creating binding #{queue.name} <--> #{routing_key}" queue.bind(@exchange, routing_key: routing_key) end end |
#bindings ⇒ Object
Return a mapping of queue names to the routing keys they’re bound to.
107 108 109 110 111 112 113 114 115 116 |
# File 'lib/hutch/broker.rb', line 107 def bindings results = Hash.new { |hash, key| hash[key] = [] } @api_client.bindings.each do |binding| next if binding['destination'] == binding['routing_key'] next unless binding['source'] == @config[:mq_exchange] next unless binding['vhost'] == @config[:mq_vhost] results[binding['destination']] << binding['routing_key'] end results end |
#connect(options = {}) ⇒ Object
17 18 19 20 21 22 23 24 25 |
# File 'lib/hutch/broker.rb', line 17 def connect( = {}) set_up_amqp_connection set_up_api_connection if .fetch(:enable_http_api_use, true) if block_given? yield disconnect end end |
#disconnect ⇒ Object
27 28 29 30 31 |
# File 'lib/hutch/broker.rb', line 27 def disconnect @channel.close if @channel @connection.close if @connection @channel, @connection, @exchange, @api_client = nil, nil, nil, nil end |
#nack(delivery_tag) ⇒ Object
155 156 157 |
# File 'lib/hutch/broker.rb', line 155 def nack(delivery_tag) @channel.nack(delivery_tag, false, false) end |
#open_channel! ⇒ Object
74 75 76 77 78 79 |
# File 'lib/hutch/broker.rb', line 74 def open_channel! logger.info 'opening rabbitmq channel' @channel = connection.create_channel.tap do |ch| ch.prefetch(@config[:channel_prefetch]) if @config[:channel_prefetch] end end |
#open_connection! ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/hutch/broker.rb', line 48 def open_connection! host = @config[:mq_host] port = @config[:mq_port] vhost = @config[:mq_vhost] username = @config[:mq_username] password = @config[:mq_password] tls = @config[:mq_tls] tls_key = @config[:mq_tls_key] tls_cert = @config[:mq_tls_cert] protocol = tls ? "amqps://" : "amqp://" sanitized_uri = "#{protocol}#{username}@#{host}:#{port}/#{vhost.sub(/^\//, '')}" logger.info "connecting to rabbitmq (#{sanitized_uri})" @connection = Bunny.new(host: host, port: port, vhost: vhost, tls: tls, tls_key: tls_key, tls_cert: tls_cert, username: username, password: password, heartbeat: 30, automatically_recover: true, network_recovery_interval: 1) with_bunny_connection_handler(sanitized_uri) do @connection.start end @connection end |
#publish(routing_key, message, properties = {}) ⇒ Object
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/hutch/broker.rb', line 159 def publish(routing_key, , properties = {}) ensure_connection!(routing_key, ) non_overridable_properties = { routing_key: routing_key, timestamp: Time.now.to_i, content_type: 'application/json' } properties[:message_id] ||= generate_id logger.info("publishing message '#{.inspect}' to #{routing_key}") @exchange.publish(JSON.dump(), {persistent: true}. merge(properties). merge(global_properties). merge(non_overridable_properties)) end |
#queue(name) ⇒ Object
Create / get a durable queue and apply namespace if it exists.
98 99 100 101 102 103 104 |
# File 'lib/hutch/broker.rb', line 98 def queue(name) with_bunny_precondition_handler('queue') do namespace = @config[:namespace].to_s.downcase.gsub(/[^-_:\.\w]/, "") name = name.prepend(namespace + ":") unless namespace.empty? channel.queue(name, durable: true) end end |
#set_up_amqp_connection ⇒ Object
Connect to RabbitMQ via AMQP. This sets up the main connection and channel we use for talking to RabbitMQ. It also ensures the existance of the exchange we’ll be using.
36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/hutch/broker.rb', line 36 def set_up_amqp_connection open_connection! open_channel! exchange_name = @config[:mq_exchange] logger.info "using topic exchange '#{exchange_name}'" with_bunny_precondition_handler('exchange') do @exchange = @channel.topic(exchange_name, durable: true) end end |
#set_up_api_connection ⇒ Object
Set up the connection to the RabbitMQ management API. Unfortunately, this is necessary to do a few things that are impossible over AMQP. E.g. listing queues and bindings.
84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/hutch/broker.rb', line 84 def set_up_api_connection logger.info "connecting to rabbitmq HTTP API (#{api_config.sanitized_uri})" with_authentication_error_handler do with_connection_error_handler do @api_client = CarrotTop.new(host: api_config.host, port: api_config.port, user: api_config.username, password: api_config.password, ssl: api_config.ssl) @api_client.exchanges end end end |
#stop ⇒ Object
147 148 149 |
# File 'lib/hutch/broker.rb', line 147 def stop @channel.work_pool.kill end |
#wait_on_threads(timeout) ⇒ Object
Each subscriber is run in a thread. This calls Thread#join on each of the subscriber threads.
140 141 142 143 144 145 |
# File 'lib/hutch/broker.rb', line 140 def wait_on_threads(timeout) # Thread#join returns nil when the timeout is hit. If any return nil, # the threads didn't all join so we return false. per_thread_timeout = timeout.to_f / work_pool_threads.length work_pool_threads.none? { |thread| thread.join(per_thread_timeout).nil? } end |