Class: Beetle::Client
Overview
This class provides the interface through which messaging is configured for both message producers and consumers. It keeps references to an instance of a Beetle::Subscriber, a Beetle::Publisher (both of which are instantiated on demand), and a reference to an instance of Beetle::DeduplicationStore.
Configuration of exchanges, queues, messages, and message handlers is done by calls to corresponding register_ methods. Note that these methods just build up the configuration, they don’t interact with the AMQP servers.
On the publisher side, publishing a message will ensure that the exchange it will be sent to, and each of the queues bound to the exchange, will be created on demand. On the subscriber side, exchanges, queues, bindings and queue subscriptions will be created when the application calls the listen_queues method. An application can decide to subscribe to only a subset of the configured queues by passing a list of queue names to the listen method.
The net effect of this strategy is that producers and consumers can be started in any order, so that no message is lost if message producers are accidentally started before the corresponding consumers.
Defined Under Namespace
Classes: Configurator
Instance Attribute Summary collapse
-
#additional_subscription_servers ⇒ Object
readonly
additional AMQP servers available for subscribing.
-
#bindings ⇒ Object
readonly
an options hash for the configured queue bindings.
-
#config ⇒ Object
readonly
accessor for the beetle configuration.
-
#deduplication_store ⇒ Object
readonly
the deduplication store to use for this client.
-
#exchanges ⇒ Object
readonly
an options hash for the configured exchanges.
-
#messages ⇒ Object
readonly
an options hash for the configured messages.
-
#queues ⇒ Object
readonly
an options hash for the configured queues.
-
#servers ⇒ Object
readonly
the AMQP servers available for publishing.
Instance Method Summary collapse
-
#configure(options = {}, &block) ⇒ Object
this is a convenience method to configure exchanges, queues, messages and handlers with a common set of options.
-
#initialize(config = Beetle.config) ⇒ Client
constructor
create a fresh Client instance from a given configuration object.
-
#listen(_deprecated_messages = nil, &block) ⇒ Object
start listening to all registered queues.
-
#listen_queues(*queues, &block) ⇒ Object
start listening to a list of queues (default to all registered queues).
-
#load(glob) ⇒ Object
evaluate the ruby files matching the given
glob
pattern in the context of the client instance. -
#pause_listening(*queues) ⇒ Object
pause listening on a list of queues.
-
#publish(message_name, data = nil, opts = {}) ⇒ Object
publishes a message.
-
#purge(*queues) ⇒ Object
purges the given queues on all configured servers.
-
#register_binding(queue_name, options = {}) ⇒ Object
register an additional binding for an already configured queue name and an options hash: [
:exchange
] the name of the exchange this queue will be bound to (defaults to the name of the queue) [:key
] the binding key (defaults to the name of the queue) automatically registers the specified exchange if it hasn’t been registered yet. -
#register_exchange(name, options = {}) ⇒ Object
register an exchange with the given name and a set of options: [
:type
] the type option will be overwritten and always be:topic
, beetle does not allow fanout exchanges [:durable
] the durable option will be overwritten and always be true. -
#register_handler(queues, *args, &block) ⇒ Object
registers a handler for a list of queues (which must have been registered previously).
-
#register_message(message_name, options = {}) ⇒ Object
register a persistent message with a given name and an options hash: [
:key
] specifies the routing key for message publishing (defaults to the name of the message) [:ttl
] specifies the time interval after which the message will be silently dropped (seconds). -
#register_queue(name, options = {}) ⇒ Object
register a durable, non passive, non auto_deleted queue with the given name and an options hash: [
:exchange
] the name of the exchange this queue will be bound to (defaults to the name of the queue) [:key
] the binding key (defaults to the name of the queue) [:lazy
] whether the queue should use lazy mode (defaults toconfig.lazy_queues_enabled
) [:dead_lettering
] whether the queue should use dead lettering (defaults toconfig.dead_lettering_enabled
) automatically registers the specified exchange if it hasn’t been registered yet. - #reset ⇒ Object
-
#resume_listening(*queues) ⇒ Object
resume listening on a list of queues.
-
#rpc(message_name, data = nil, opts = {}) ⇒ Object
sends the given message to one of the configured servers and returns the result of running the associated handler.
-
#setup_queues_and_policies ⇒ Object
set up queues and policies for all configured queues.
-
#stop_listening ⇒ Object
stops the subscriber by closing all channels and connections.
-
#stop_publishing ⇒ Object
disconnects the publisher from all servers it’s currently connected to.
-
#throttle(throttling_options) ⇒ Object
throttle publishing of messages based on queue lengths.
-
#throttled? ⇒ Boolean
is the publisher currently throttled?.
-
#trace(queue_names = self.queues.keys, tracer = nil, &block) ⇒ Object
traces queues without consuming them.
- #update_queue_properties!(message_payload) ⇒ Object
Methods included from Logging
Constructor Details
#initialize(config = Beetle.config) ⇒ Client
create a fresh Client instance from a given configuration object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/beetle/client.rb', line 51 def initialize(config = Beetle.config) @config = config @exchanges = {} @queues = {} @messages = {} @bindings = {} @deduplication_store = DeduplicationStore.new(config) @queue_properties = QueueProperties.new(config) load_brokers_from_config register_exchange(config.beetle_policy_exchange_name) # make sure dead lettering is false for the policy update queue register_queue( config.beetle_policy_updates_queue_name, :exchange => config.beetle_policy_exchange_name, :key => config.beetle_policy_updates_routing_key, :dead_lettering => false, :lazy => false, ) end |
Instance Attribute Details
#additional_subscription_servers ⇒ Object (readonly)
additional AMQP servers available for subscribing. useful for migration scenarios.
30 31 32 |
# File 'lib/beetle/client.rb', line 30 def additional_subscription_servers @additional_subscription_servers end |
#bindings ⇒ Object (readonly)
an options hash for the configured queue bindings
39 40 41 |
# File 'lib/beetle/client.rb', line 39 def bindings @bindings end |
#config ⇒ Object (readonly)
accessor for the beetle configuration
48 49 50 |
# File 'lib/beetle/client.rb', line 48 def config @config end |
#deduplication_store ⇒ Object (readonly)
the deduplication store to use for this client
45 46 47 |
# File 'lib/beetle/client.rb', line 45 def deduplication_store @deduplication_store end |
#exchanges ⇒ Object (readonly)
an options hash for the configured exchanges
33 34 35 |
# File 'lib/beetle/client.rb', line 33 def exchanges @exchanges end |
#messages ⇒ Object (readonly)
an options hash for the configured messages
42 43 44 |
# File 'lib/beetle/client.rb', line 42 def @messages end |
#queues ⇒ Object (readonly)
an options hash for the configured queues
36 37 38 |
# File 'lib/beetle/client.rb', line 36 def queues @queues end |
#servers ⇒ Object (readonly)
the AMQP servers available for publishing
27 28 29 |
# File 'lib/beetle/client.rb', line 27 def servers @servers end |
Instance Method Details
#configure(options = {}, &block) ⇒ Object
this is a convenience method to configure exchanges, queues, messages and handlers with a common set of options. allows one to call all register methods without the register_ prefix. returns self. if the passed in block has no parameters, the block will be evaluated in the context of the client configurator.
Example: (block with config argument)
client = Beetle.client.new.configure :exchange => :foobar do |config|
config.queue :q1, :key => "foo"
config.queue :q2, :key => "bar"
config.message :foo
config.message :bar
config.handler :q1 { puts "got foo"}
config.handler :q2 { puts "got bar"}
end
Example: (block without config argument)
client = Beetle.client.new.configure :exchange => :foobar do
queue :q1, :key => "foo"
queue :q2, :key => "bar"
message :foo
message :bar
handler :q1 { puts "got foo"}
handler :q2 { puts "got bar"}
end
194 195 196 197 198 199 200 201 202 |
# File 'lib/beetle/client.rb', line 194 def configure(={}, &block) configurator = Configurator.new(self, ) if block.arity == 1 yield configurator else configurator.instance_eval(&block) end self end |
#listen(_deprecated_messages = nil, &block) ⇒ Object
start listening to all registered queues. Calls #listen_queues internally runs the given block before entering the eventmachine loop.
227 228 229 230 |
# File 'lib/beetle/client.rb', line 227 def listen(=nil, &block) raise Error.new("Beetle::Client#listen no longer works with arguments. Please use #listen_queues(['queue1', 'queue2']) instead") if listen_queues(&block) end |
#listen_queues(*queues, &block) ⇒ Object
start listening to a list of queues (default to all registered queues). runs the given block before entering the eventmachine loop.
234 235 236 237 |
# File 'lib/beetle/client.rb', line 234 def listen_queues(*queues, &block) queues = determine_queue_names(queues) subscriber.listen_queues(queues, &block) end |
#load(glob) ⇒ Object
evaluate the ruby files matching the given glob
pattern in the context of the client instance.
313 314 315 316 317 318 |
# File 'lib/beetle/client.rb', line 313 def load(glob) b = binding Dir[glob].each do |f| eval(File.read(f), b, f) end end |
#pause_listening(*queues) ⇒ Object
pause listening on a list of queues
251 252 253 254 |
# File 'lib/beetle/client.rb', line 251 def pause_listening(*queues) queues = determine_queue_names(queues) subscriber.pause_listening(queues) end |
#publish(message_name, data = nil, opts = {}) ⇒ Object
publishes a message. the given options hash is merged with options given on message registration. WARNING: empty message bodies can lead to problems.
206 207 208 209 |
# File 'lib/beetle/client.rb', line 206 def publish(, data=nil, opts={}) = () publisher.publish(, data, opts) end |
#purge(*queues) ⇒ Object
purges the given queues on all configured servers
220 221 222 223 |
# File 'lib/beetle/client.rb', line 220 def purge(*queues) queues = determine_queue_names(queues) publisher.purge(queues) end |
#register_binding(queue_name, options = {}) ⇒ Object
register an additional binding for an already configured queue name and an options hash:
:exchange
-
the name of the exchange this queue will be bound to (defaults to the name of the queue)
:key
-
the binding key (defaults to the name of the queue)
automatically registers the specified exchange if it hasn’t been registered yet
116 117 118 119 120 121 122 123 124 125 |
# File 'lib/beetle/client.rb', line 116 def register_binding(queue_name, ={}) name = queue_name.to_s opts = .symbolize_keys exchange = (opts[:exchange] || name).to_s key = (opts[:key] || name).to_s (bindings[name] ||= []) << {:exchange => exchange, :key => key} register_exchange(exchange) unless exchanges.include?(exchange) queues = exchanges[exchange][:queues] queues << name unless queues.include?(name) end |
#register_exchange(name, options = {}) ⇒ Object
register an exchange with the given name and a set of options:
:type
-
the type option will be overwritten and always be
:topic
, beetle does not allow fanout exchanges :durable
-
the durable option will be overwritten and always be true. this is done to ensure that exchanges are never deleted
77 78 79 80 81 |
# File 'lib/beetle/client.rb', line 77 def register_exchange(name, ={}) name = name.to_s raise ConfigurationError.new("exchange #{name} already configured") if exchanges.include?(name) exchanges[name] = .symbolize_keys.merge(:type => :topic, :durable => true, :queues => []) end |
#register_handler(queues, *args, &block) ⇒ Object
registers a handler for a list of queues (which must have been registered previously). The handler will be invoked when any messages arrive on the queue.
Examples:
register_handler([:foo, :bar], :timeout => 10.seconds) { || puts "received #{}" }
on_error = lambda{ puts "something went wrong with baz" }
on_failure = lambda{ puts "baz has finally failed" }
register_handler(:baz, :exceptions => 1, :errback => on_error, :failback => on_failure) { puts "received baz" }
register_handler(:bar, BarHandler)
For details on handler classes see class Beetle::Handler
161 162 163 164 165 166 167 |
# File 'lib/beetle/client.rb', line 161 def register_handler(queues, *args, &block) queues = determine_queue_names(Array(queues)) opts = args.last.is_a?(Hash) ? args.pop : {} handler = args.shift raise ArgumentError.new("too many arguments for handler registration") unless args.empty? subscriber.register_handler(queues, opts, handler, &block) end |
#register_message(message_name, options = {}) ⇒ Object
register a persistent message with a given name and an options hash:
:key
-
specifies the routing key for message publishing (defaults to the name of the message)
:ttl
-
specifies the time interval after which the message will be silently dropped (seconds). defaults to Message::DEFAULT_TTL.
:redundant
-
specifies whether the message should be published redundantly (defaults to false)
136 137 138 139 140 141 142 143 144 |
# File 'lib/beetle/client.rb', line 136 def (, ={}) name = .to_s raise ConfigurationError.new("message #{name} already configured") if .include?(name) opts = {:exchange => name, :key => name}.merge!(.symbolize_keys) opts.merge! :persistent => true exchange = opts[:exchange] = opts[:exchange].to_s register_exchange(exchange) unless exchanges.include?(exchange) [name] = opts end |
#register_queue(name, options = {}) ⇒ Object
register a durable, non passive, non auto_deleted queue with the given name and an options hash:
:exchange
-
the name of the exchange this queue will be bound to (defaults to the name of the queue)
:key
-
the binding key (defaults to the name of the queue)
:lazy
-
whether the queue should use lazy mode (defaults to
config.lazy_queues_enabled
) :dead_lettering
-
whether the queue should use dead lettering (defaults to
config.dead_lettering_enabled
)
automatically registers the specified exchange if it hasn’t been registered yet
94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/beetle/client.rb', line 94 def register_queue(name, ={}) name = name.to_s raise ConfigurationError.new("queue #{name} already configured") if queues.include?(name) opts = { :exchange => name, :key => name, :auto_delete => false, :amqp_name => name, :lazy => config.lazy_queues_enabled, :dead_lettering => config.dead_lettering_enabled, :dead_lettering_msg_ttl => config.dead_lettering_msg_ttl }.merge!(.symbolize_keys) opts.merge! :durable => true, :passive => false, :exclusive => false exchange = opts.delete(:exchange).to_s key = opts.delete(:key) queues[name] = opts register_binding(name, :exchange => exchange, :key => key) end |
#reset ⇒ Object
320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/beetle/client.rb', line 320 def reset stop_publishing stop_listening config.reload load_brokers_from_config rescue Exception => e logger.warn("Error resetting client") logger.warn(e) ensure @publisher = nil @subscriber = nil end |
#resume_listening(*queues) ⇒ Object
resume listening on a list of queues
257 258 259 260 |
# File 'lib/beetle/client.rb', line 257 def resume_listening(*queues) queues = determine_queue_names(queues) subscriber.resume_listening(queues) end |
#rpc(message_name, data = nil, opts = {}) ⇒ Object
sends the given message to one of the configured servers and returns the result of running the associated handler.
unexpected behavior can ensue if the message gets routed to more than one recipient, so be careful.
214 215 216 217 |
# File 'lib/beetle/client.rb', line 214 def rpc(, data=nil, opts={}) = () publisher.rpc(, data, opts) end |
#setup_queues_and_policies ⇒ Object
set up queues and policies for all configured queues. Otherwise this will happen on first use of an exchange, which can be undesired for latency sensitive endpoints. Only needs to be called once.
286 287 288 |
# File 'lib/beetle/client.rb', line 286 def setup_queues_and_policies publisher.setup_queues_and_policies end |
#stop_listening ⇒ Object
stops the subscriber by closing all channels and connections. note this an asynchronous operation due to the underlying eventmachine mechanism.
241 242 243 |
# File 'lib/beetle/client.rb', line 241 def stop_listening @subscriber.stop! if @subscriber end |
#stop_publishing ⇒ Object
disconnects the publisher from all servers it’s currently connected to
246 247 248 |
# File 'lib/beetle/client.rb', line 246 def stop_publishing @publisher.stop if @publisher end |
#throttle(throttling_options) ⇒ Object
throttle publishing of messages based on queue lengths.
client.throttle(:foo => 10000, :bar => 10_000)
throttle publisher to 1 message per second as long as queue foo has more than 10000 entries or queue bar has more than 100000 entries across all servers. Queue lenghts are periodically determined during publishing. You only want to use this feature if you plan to publish a huge amount of messages to slow consumers so as to not overload the broker or the redis deduplication store. You’ll want to use this feature when running background jobs that publish huge amounts of messages to avoid overloading brokers and the message deduplication store.
274 275 276 |
# File 'lib/beetle/client.rb', line 274 def throttle() publisher.throttle(.stringify_keys) end |
#throttled? ⇒ Boolean
is the publisher currently throttled?
279 280 281 |
# File 'lib/beetle/client.rb', line 279 def throttled? publisher.throttled? end |
#trace(queue_names = self.queues.keys, tracer = nil, &block) ⇒ Object
traces queues without consuming them. useful for debugging message flow.
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/beetle/client.rb', line 291 def trace(queue_names=self.queues.keys, tracer=nil, &block) queues_to_trace = self.queues.slice(*queue_names) queues_to_trace.each do |name, opts| opts.merge! :durable => false, :auto_delete => true, :amqp_name => queue_name_for_tracing(opts[:amqp_name]) end tracer ||= lambda do |msg| puts "-----===== new message =====-----" puts "SERVER: #{msg.server}" puts "HEADER: #{msg.header.attributes[:headers].inspect}" puts "EXCHANGE: #{msg.header.method.exchange}" puts "KEY: #{msg.header.method.routing_key}" puts "MSGID: #{msg.msg_id}" puts "DATA: #{msg.data}" end register_handler(queue_names){|msg| tracer.call msg } @subscriber.tracing = true listen_queues(queue_names, &block) @subscriber.tracing = false end |
#update_queue_properties!(message_payload) ⇒ Object
333 334 335 |
# File 'lib/beetle/client.rb', line 333 def update_queue_properties!() @queue_properties.update_queue_properties!() end |