Class: BugBunny::Adapter
- Inherits:
-
Object
- Object
- BugBunny::Adapter
- Defined in:
- lib/bug_bunny/adapter.rb
Constant Summary collapse
- PERSIST_MESSAGE =
true
- SERVICE_HEALTH_CHECK =
:health_check
- TIMEOUT =
3
- BOMBA =
:bomba
- PUBLISH_TIMEOUT =
:publish_timeout
- CONSUMER_TIMEOUT =
:consumer_timeout
- COMUNICATION_ERROR =
:comunication_error
- CONSUMER_COUNT_ZERO =
:consumer_count_zero
- PG_EXCEPTIONS_TO_EXIT =
%w[PG::ConnectionBad PG::UnableToSend].freeze
Instance Attribute Summary collapse
-
#communication_response ⇒ Object
Returns the value of attribute communication_response.
-
#consumer ⇒ Object
Returns the value of attribute consumer.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#rabbit ⇒ Object
Returns the value of attribute rabbit.
-
#service_message ⇒ Object
Returns the value of attribute service_message.
-
#time_to_wait ⇒ Object
Returns the value of attribute time_to_wait.
Class Method Summary collapse
Instance Method Summary collapse
- #build_queue(name, opts = {}) ⇒ Object
- #check_pg_exception!(exception) ⇒ Object
- #close_connection! ⇒ Object
- #consume!(queue, thread: false, manual_ack: true, exclusive: false, block: true, opts: {}) ⇒ Object
-
#initialize(attrs = {}) ⇒ Adapter
constructor
A new instance of Adapter.
- #make_response ⇒ Object
- #publish!(message, publish_queue, opts = {}) ⇒ Object
- #publish_and_consume!(publish_message, sync_queue, opts = {}) ⇒ Object
- #status ⇒ Object
Constructor Details
#initialize(attrs = {}) ⇒ Adapter
Returns a new instance of Adapter.
22 23 24 25 26 27 |
# File 'lib/bug_bunny/adapter.rb', line 22 def initialize(attrs = {}) @logger = Logger.new('./log/bug_bunny.log', 'monthly') @communication_response = ::BugBunny::Message.new(status: :error) @time_to_wait = 2 create_adapter_with_rabbit end |
Instance Attribute Details
#communication_response ⇒ Object
Returns the value of attribute communication_response.
15 16 17 |
# File 'lib/bug_bunny/adapter.rb', line 15 def communication_response @communication_response end |
#consumer ⇒ Object
Returns the value of attribute consumer.
15 16 17 |
# File 'lib/bug_bunny/adapter.rb', line 15 def consumer @consumer end |
#logger ⇒ Object
Returns the value of attribute logger.
15 16 17 |
# File 'lib/bug_bunny/adapter.rb', line 15 def logger @logger end |
#rabbit ⇒ Object
Returns the value of attribute rabbit.
15 16 17 |
# File 'lib/bug_bunny/adapter.rb', line 15 def rabbit @rabbit end |
#service_message ⇒ Object
Returns the value of attribute service_message.
15 16 17 |
# File 'lib/bug_bunny/adapter.rb', line 15 def @service_message end |
#time_to_wait ⇒ Object
Returns the value of attribute time_to_wait.
15 16 17 |
# File 'lib/bug_bunny/adapter.rb', line 15 def time_to_wait @time_to_wait end |
Class Method Details
.make_response(comunication_result, consume_result = nil) ⇒ Object
269 270 271 272 273 274 275 276 |
# File 'lib/bug_bunny/adapter.rb', line 269 def self.make_response(comunication_result, consume_result = nil) if comunication_result.success? consume_result || comunication_result else comunication_result.response = comunication_result.response.to_s comunication_result end end |
Instance Method Details
#build_queue(name, opts = {}) ⇒ Object
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/bug_bunny/adapter.rb', line 237 def build_queue(name, opts = {}) init = opts.key?(:initialize) ? opts[:initialize] : true new_queue = ::BugBunny::Queue.new(opts.merge(name: name)) if init logger.debug("Building rabbit new_queue: #{name} status: #{rabbit.status} queue_options: #{new_queue.}") retries = 0 begin built_queue = rabbit.channel.queue(new_queue.name.to_s, new_queue.) rescue StandardError if (retries += 1) <= 3 sleep 0.5 retry end raise end new_queue.rabbit_queue = built_queue new_queue.name = new_queue.rabbit_queue.name end new_queue rescue Timeout::Error, StandardError => e logger.debug("Rabbit Identifier: #{rabbit.try(:identifier)}") logger.debug("Status adapter created: #{rabbit.status}") logger.error(e) close_connection! raise Exception::ComunicationRabbitError.new(COMUNICATION_ERROR, e.backtrace) end |
#check_pg_exception!(exception) ⇒ Object
295 296 297 298 299 300 |
# File 'lib/bug_bunny/adapter.rb', line 295 def check_pg_exception!(exception) # el consumidor no reconecta (rails tasks) asi que salimos a la goma if PG_EXCEPTIONS_TO_EXIT.any? { |msg| exception.try(:message)&.starts_with?(msg) } exit 7 # salimos con un int especial para identificarlo end end |
#close_connection! ⇒ Object
291 292 293 |
# File 'lib/bug_bunny/adapter.rb', line 291 def close_connection! rabbit.try(:close) end |
#consume!(queue, thread: false, manual_ack: true, exclusive: false, block: true, opts: {}) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/bug_bunny/adapter.rb', line 66 def consume!(queue, thread: false, manual_ack: true, exclusive: false, block: true, opts: {}) Signal.trap('INT') { exit } logger.debug("Suscribe consumer to: #{queue.name}") logger.debug("ENTRO AL CONSUMER #{rabbit.try(:identifier)}") self.consumer = queue.rabbit_queue.subscribe(manual_ack: manual_ack, exclusive: exclusive, block: block) do |delivery_info, , json_payload| # Session depends on thread info, subscribe block cleans thread info # ::Session.init unless Session.tags_context begin payload = ActiveSupport::JSON.decode(json_payload).deep_symbolize_keys # Timezones pulenteado rescue StandardError payload = JSON.parse(json_payload).deep_symbolize_keys end # Session for Sentry logger # locale, version, service_name # payload.except(:body, :service_name).each do |k, v| # Session.assign(k, v) # end # Session.from_service = payload[:service_name] # Session.correlation_id = metadata.correlation_id # Session.queue_name = queue.name # unless defined?(ActiveRecord) && ActiveRecord::Base.connection_pool.with_connection(&:active?) # logger.error('[PG] PG connection down') # exit 7 # end begin = ::BugBunny::Message.new(correlation_id: .correlation_id, reply_to: .reply_to, **payload) # Default sentry info # ::Session.request_id = message.correlation_id rescue nil # ::Session.tags_context.merge!( # server_version: message.version, # service_action: message.service_action, # service_name: message.service_name, # isp_id: (message.body&.fetch(:isp_id, nil) rescue nil) # ) # ::Session.extra_context[:message] = message.body logger.info("#{queue.name}-Received Request: (#{.service_action})") logger.debug("#{queue.name}-Received Request: (#{})") logger.debug("Message will be yield") logger.debug("Block given? #{block_given?}") yield() if block_given? logger.debug('Message processed') begin Timeout.timeout(5) do rabbit.channel.ack delivery_info.delivery_tag if delivery_info[:consumer].manual_acknowledgement? end rescue Timeout::Error => e logger.debug("Rabbit Identifier: #{rabbit.try(:identifier)} can not check manual_ack #{e.to_s}") rescue ::StandardError => e logger.debug("Rabbit Identifier: #{rabbit.try(:identifier)} can not check manual_ack #{e.to_s}") end self. = self.communication_response = ::BugBunny::Message.new(status: :success) rescue ::SystemExit => e # Ensure exit code raise e rescue => e logger.debug("Rabbit Identifier: #{rabbit.try(:identifier)}") logger.error(e) close_connection! # Session.clean! self.communication_response = ::BugBunny::Message.new(status: :error, body: BOMBA, exception: e) end if thread # sync consumer flag :D begin Timeout::timeout(1) do delivery_info[:consumer].cancel end rescue Timeout::Error => e close_connection! thread.exit end close_connection! thread.exit end end if thread close_connection! thread.exit else while true begin logger.debug("SALIO DEL CONSUMER #{rabbit.try(:identifier)}") logger.debug(rabbit.status) exit # consumer.cancel rescue Bunny::NotAllowedError => e logger.debug("NOT ALLOWED #{e.to_s}") break rescue Timeout::Error => e if queue.rabbit_queue.channel.status == :closed || queue.rabbit_queue.channel.connection.status == :closed logger.debug("Channel or connection closed") break end sleep time_to_wait logger.debug("Rabbit Identifier: #{rabbit.try(:identifier)}") logger.error(e) retry rescue StandardError => e if queue.rabbit_queue.channel.status == :closed || queue.rabbit_queue.channel.connection.status == :closed logger.debug("Channel or connection closed") break end sleep time_to_wait logger.debug("Rabbit Identifier: #{rabbit.try(:identifier)}") logger.error(e) retry end end end rescue Timeout::Error => e logger.debug("Rabbit Identifier: #{rabbit.try(:identifier)}") logger.error(e) close_connection! ::BugBunny::Message.new(status: :error, body: CONSUMER_TIMEOUT, exception: e) rescue StandardError => e logger.debug("Rabbit Identifier: #{rabbit.try(:identifier)}") logger.error(e) close_connection! ::BugBunny::Message.new(status: :error, body: BOMBA, exception: e) end |
#make_response ⇒ Object
278 279 280 281 282 283 284 285 |
# File 'lib/bug_bunny/adapter.rb', line 278 def make_response if communication_response.success? || communication_response else communication_response.response = communication_response.response.to_s communication_response end end |
#publish!(message, publish_queue, opts = {}) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/bug_bunny/adapter.rb', line 29 def publish!(, publish_queue, opts = {}) Timeout::timeout(TIMEOUT) do if opts[:check_consumers_count] && publish_queue.check_consumers.zero? self.communication_response = ::BugBunny::Message.new(status: :error, body: CONSUMER_COUNT_ZERO) return end publish_opts = { routing_key: publish_queue.name, persistent: opts[:persistent], correlation_id: .correlation_id } publish_opts[:reply_to] = opts[:reply_to] if opts[:reply_to] # Esta es la idea en el caso que nos pongamos mas maƱosos y queramos cambiar las exchange a la hora de publicar. # _exchange = if opts.has_key?(:exchange_type) # channel.exchange(opts[:exchange_type].to_s, { type: opts[:exchange_type] }) # else # exchange # end # _exchange.publish(message.to_json, publish_opts) logger.debug("#{publish_queue.name}-Send Request: (#{})") rabbit.exchange.publish(.to_json, publish_opts) rabbit.channel.wait_for_confirms if rabbit.confirm_select self.communication_response = ::BugBunny::Message.new(status: true) end rescue Timeout::Error => e logger.error(e) close_connection! self.communication_response = ::BugBunny::Message.new(status: :error, body: PUBLISH_TIMEOUT, exception: e) rescue StandardError => e logger.error(e) close_connection! self.communication_response = ::BugBunny::Message.new(status: :error, body: BOMBA, exception: e) end |
#publish_and_consume!(publish_message, sync_queue, opts = {}) ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/bug_bunny/adapter.rb', line 201 def publish_and_consume!(, sync_queue, opts={}) reply_queue = build_queue('', initialize: true, exclusive: true, durable: false, auto_delete: true) retries = 0 begin publish!(, sync_queue, opts.merge(reply_to: reply_queue.name)) rescue if (retries += 1) <= 3 sleep 0.5 retry end close_connection! raise end return communication_response unless communication_response.success? t = Thread.new do retries = 0 begin consume!(reply_queue, thread: Thread.current, exclusive: true) do |msg| yield(msg) if block_given? end rescue if (retries += 1) <= 3 sleep 0.5 retry end raise end end t.join communication_response end |
#status ⇒ Object
287 288 289 |
# File 'lib/bug_bunny/adapter.rb', line 287 def status rabbit.try(:status) end |