Class: Beetle::Subscriber
Overview
Manages subscriptions and message processing on the receiver side of things.
Instance Attribute Summary collapse
-
#tracing ⇒ Object
Returns the value of attribute tracing.
Attributes inherited from Base
Instance Method Summary collapse
-
#initialize(client, options = {}) ⇒ Subscriber
constructor
create a new subscriber instance.
-
#listen_queues(queues) ⇒ Object
the client calls this method to subscribe to a list of queues.
- #pause_listening(queues) ⇒ Object
-
#register_handler(queues, opts = {}, handler = nil, &block) ⇒ Object
register handler for the given queues (see Client#register_handler).
- #resume_listening(queues) ⇒ Object
-
#stop! ⇒ Object
closes all AMQP connections and stop the eventmachine loop.
- #tracing? ⇒ Boolean
Methods included from Logging
Constructor Details
#initialize(client, options = {}) ⇒ Subscriber
create a new subscriber instance
13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/beetle/subscriber.rb', line 13 def initialize(client, = {}) #:nodoc: super @servers.concat @client.additional_subscription_servers @handlers = {} @connections = {} @channels = {} @subscriptions = {} @listened_queues = [] @channels_closed = false @tracing = false end |
Instance Attribute Details
#tracing ⇒ Object
Returns the value of attribute tracing.
7 8 9 |
# File 'lib/beetle/subscriber.rb', line 7 def tracing @tracing end |
Instance Method Details
#listen_queues(queues) ⇒ Object
the client calls this method to subscribe to a list of queues. this method does the following things:
-
creates all exchanges which have been registered for the given queues
-
creates and binds each listed queue queues
-
subscribes the handlers for all these queues
yields before entering the eventmachine loop (if a block was given)
33 34 35 36 37 38 39 40 41 42 |
# File 'lib/beetle/subscriber.rb', line 33 def listen_queues(queues) #:nodoc: @listened_queues = queues @exchanges_for_queues = exchanges_for_queues(queues) EM.run do each_server_sorted_randomly do connect_server connection_settings end yield if block_given? end end |
#pause_listening(queues) ⇒ Object
44 45 46 47 48 |
# File 'lib/beetle/subscriber.rb', line 44 def pause_listening(queues) each_server do queues.each { |name| pause(name) if has_subscription?(name) } end end |
#register_handler(queues, opts = {}, handler = nil, &block) ⇒ Object
register handler for the given queues (see Client#register_handler)
73 74 75 76 77 |
# File 'lib/beetle/subscriber.rb', line 73 def register_handler(queues, opts={}, handler=nil, &block) #:nodoc: Array(queues).each do |queue| @handlers[queue] = [opts.symbolize_keys, handler || block] end end |
#resume_listening(queues) ⇒ Object
50 51 52 53 54 |
# File 'lib/beetle/subscriber.rb', line 50 def resume_listening(queues) each_server do queues.each { |name| resume(name) if has_subscription?(name) } end end |
#stop! ⇒ Object
closes all AMQP connections and stop the eventmachine loop. note that the shutdown process is asynchronous. must not be called while a message handler is running. typically one would use EM.add_timer(0) { stop! }
to ensure this.
59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/beetle/subscriber.rb', line 59 def stop! #:nodoc: if EM.reactor_running? EM.add_timer(0) do close_all_channels close_all_connections end else # try to clean up as much a possible under the circumstances, by closing all connections # this should a least close the sockets close_connections_with_reactor_not_running end end |
#tracing? ⇒ Boolean
8 9 10 |
# File 'lib/beetle/subscriber.rb', line 8 def tracing? @tracing end |