Class: Beetle::Publisher
Overview
Provides the publishing logic implementation.
Constant Summary collapse
- RPC_DEFAULT_TIMEOUT =
:nodoc:
10
Instance Attribute Summary collapse
-
#dead_servers ⇒ Object
readonly
Returns the value of attribute dead_servers.
Attributes inherited from Base
Instance Method Summary collapse
-
#bunny_exceptions ⇒ Object
list of exceptions potentially raised by bunny these need to be lazy, because qrack exceptions are only defined after a connection has been established.
-
#initialize(client, options = {}) ⇒ Publisher
constructor
:nodoc:.
-
#publish(message_name, data, opts = {}) ⇒ Object
:nodoc:.
-
#publish_with_failover(exchange_name, message_name, data, opts) ⇒ Object
:nodoc:.
-
#publish_with_redundancy(exchange_name, message_name, data, opts) ⇒ Object
:nodoc:.
-
#purge(queue_names) ⇒ Object
:nodoc:.
-
#rpc(message_name, data, opts = {}) ⇒ Object
:nodoc:.
- #setup_queues_and_policies ⇒ Object
-
#stop ⇒ Object
:nodoc:.
- #throttle(queue_options) ⇒ Object
- #throttle! ⇒ Object
- #throttled? ⇒ Boolean
- #throttling? ⇒ Boolean
- #throttling_status ⇒ Object
Methods included from Logging
Constructor Details
#initialize(client, options = {}) ⇒ Publisher
:nodoc:
7 8 9 10 11 12 13 14 15 16 |
# File 'lib/beetle/publisher.rb', line 7 def initialize(client, = {}) #:nodoc: super @exchanges_with_bound_queues = {} @dead_servers = {} @bunnies = {} @throttling_options = {} @next_throttle_refresh = Time.now @throttled = false at_exit { stop } end |
Instance Attribute Details
#dead_servers ⇒ Object (readonly)
Returns the value of attribute dead_servers.
5 6 7 |
# File 'lib/beetle/publisher.rb', line 5 def dead_servers @dead_servers end |
Instance Method Details
#bunny_exceptions ⇒ Object
list of exceptions potentially raised by bunny these need to be lazy, because qrack exceptions are only defined after a connection has been established
32 33 34 35 36 37 38 39 |
# File 'lib/beetle/publisher.rb', line 32 def bunny_exceptions [ Bunny::ConnectionError, Bunny::ForcedChannelCloseError, Bunny::ForcedConnectionCloseError, Bunny::MessageError, Bunny::ProtocolError, Bunny::ServerDownError, Bunny::UnsubscribeError, Bunny::AcknowledgementError, Qrack::BufferOverflowError, Qrack::InvalidTypeError, Errno::EHOSTUNREACH, Errno::ECONNRESET, Errno::ETIMEDOUT, Timeout::Error ] end |
#publish(message_name, data, opts = {}) ⇒ Object
:nodoc:
41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/beetle/publisher.rb', line 41 def publish(, data, opts={}) #:nodoc: ActiveSupport::Notifications.instrument('publish.beetle') do opts = @client.[].merge(opts.symbolize_keys) exchange_name = opts.delete(:exchange) opts.delete(:queue) recycle_dead_servers unless @dead_servers.empty? throttle! if opts[:redundant] publish_with_redundancy(exchange_name, , data, opts) else publish_with_failover(exchange_name, , data, opts) end end end |
#publish_with_failover(exchange_name, message_name, data, opts) ⇒ Object
:nodoc:
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/beetle/publisher.rb', line 56 def publish_with_failover(exchange_name, , data, opts) #:nodoc: tries = @servers.size * 2 logger.debug "Beetle: sending #{}" published = 0 opts = Message.(opts) begin select_next_server if tries.even? bind_queues_for_exchange(exchange_name) logger.debug "Beetle: trying to send message #{}:#{opts[:message_id]} to #{@server}" exchange(exchange_name).publish(data, opts) logger.debug "Beetle: message sent!" published = 1 rescue *bunny_exceptions => e logger.warn("Beetle: publishing exception #{e} #{e.backtrace[0..4].join("\n")}") stop!(e) tries -= 1 # retry same server on receiving the first exception for it (might have been a normal restart) # in this case you'll see either a broken pipe or a forced connection shutdown error retry if tries.odd? mark_server_dead retry if tries > 0 logger.error "Beetle: message could not be delivered: #{}" raise NoMessageSent.new end published end |
#publish_with_redundancy(exchange_name, message_name, data, opts) ⇒ Object
:nodoc:
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/beetle/publisher.rb', line 83 def publish_with_redundancy(exchange_name, , data, opts) #:nodoc: if @servers.size < 2 logger.warn "Beetle: at least two active servers are required for redundant publishing" if @dead_servers.size > 0 return publish_with_failover(exchange_name, , data, opts) end published = [] opts = Message.(opts) loop do break if published.size == 2 || @servers.empty? || published == @servers tries = 0 select_next_server begin next if published.include? @server bind_queues_for_exchange(exchange_name) logger.debug "Beetle: trying to send #{}:#{opts[:message_id]} to #{@server}" exchange(exchange_name).publish(data, opts) published << @server logger.debug "Beetle: message sent (#{published})!" rescue *bunny_exceptions => e logger.warn("Beetle: publishing exception #{e} #{e.backtrace[0..4].join("\n")}") stop!(e) retry if (tries += 1) == 1 mark_server_dead end end case published.size when 0 logger.error "Beetle: message could not be delivered: #{}" raise NoMessageSent.new when 1 logger.warn "Beetle: failed to send message redundantly" end published.size end |
#purge(queue_names) ⇒ Object
:nodoc:
166 167 168 169 170 171 172 |
# File 'lib/beetle/publisher.rb', line 166 def purge(queue_names) #:nodoc: each_server do queue_names.each do |name| queue(name).purge rescue nil end end end |
#rpc(message_name, data, opts = {}) ⇒ Object
:nodoc:
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/beetle/publisher.rb', line 121 def rpc(, data, opts={}) #:nodoc: opts = @client.[].merge(opts.symbolize_keys) exchange_name = opts.delete(:exchange) opts.delete(:queue) recycle_dead_servers unless @dead_servers.empty? tries = @servers.size logger.debug "Beetle: performing rpc with message #{}" result = nil status = "TIMEOUT" begin select_next_server bind_queues_for_exchange(exchange_name) # create non durable, autodeleted temporary queue with a server assigned name queue = bunny.queue opts = Message.(opts.merge :reply_to => queue.name) logger.debug "Beetle: trying to send #{}:#{opts[:message_id]} to #{@server}" exchange(exchange_name).publish(data, opts) logger.debug "Beetle: message sent!" logger.debug "Beetle: listening on reply queue #{queue.name}" queue.subscribe(:message_max => 1, :timeout => opts[:timeout] || RPC_DEFAULT_TIMEOUT) do |msg| logger.debug "Beetle: received reply!" result = msg[:payload] status = msg[:header].properties[:headers][:status] end logger.debug "Beetle: rpc complete!" rescue *bunny_exceptions => e stop!(e) mark_server_dead tries -= 1 retry if tries > 0 logger.error "Beetle: message could not be delivered: #{}" end [status, result] end |
#setup_queues_and_policies ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/beetle/publisher.rb', line 174 def setup_queues_and_policies each_server do begin @client.queues.keys.each do |name| queue(name) end rescue => e logger.warn "Beetle: failed setting up queues and policies on #{@server}: #{e}" end end end |
#stop ⇒ Object
:nodoc:
186 187 188 |
# File 'lib/beetle/publisher.rb', line 186 def stop #:nodoc: each_server { stop! } end |
#throttle(queue_options) ⇒ Object
156 157 158 |
# File 'lib/beetle/publisher.rb', line 156 def throttle() @throttling_options = end |
#throttle! ⇒ Object
160 161 162 163 164 |
# File 'lib/beetle/publisher.rb', line 160 def throttle! return unless throttling? refresh_throttling! sleep 1 if throttled? end |
#throttled? ⇒ Boolean
18 19 20 |
# File 'lib/beetle/publisher.rb', line 18 def throttled? @throttled end |
#throttling? ⇒ Boolean
22 23 24 |
# File 'lib/beetle/publisher.rb', line 22 def throttling? !@throttling_options.empty? end |
#throttling_status ⇒ Object
26 27 28 |
# File 'lib/beetle/publisher.rb', line 26 def throttling_status @throttled ? 'throttled' : 'unthrottled' end |