Class: Istox::BunnyBoot
- Inherits:
-
Object
- Object
- Istox::BunnyBoot
- Defined in:
- lib/istox/helpers/bunny_boot.rb
Class Method Summary collapse
-
.add_consumer_interceptor(interceptor) ⇒ Object
RABBITMQ interceptors.
- .add_publisher_interceptor(interceptor) ⇒ Object
- .binding_exchange_id(id) ⇒ Object
- .binding_routing_key(id) ⇒ Object
- .cacheMsg(suffix, msg) ⇒ Object
-
.channel(conn, opts = {}) ⇒ Object
Create virtual channel on established connection Configure pool_size, prefetch, confirm mode according to opts.
- .channel_confirm?(ch) ⇒ Boolean
- .channel_pool_size(consumer_key) ⇒ Object
- .channel_prefetch(consumer_key) ⇒ Object
- .confirm_mode(eid) ⇒ Object
-
.connection ⇒ Object
Create physical connection to RabbitMQ During failover of RabbitMQ cluster or temporary failure, there may be error and needs retry in loop.
- .consumer_interceptors ⇒ Object
- .create_tracker(channel_id, delivery_tag, eid, options, payload) ⇒ Object
- .del_tracker_on_channel(channel_id) ⇒ Object
- .eid(ex) ⇒ Object
- .etype(ex) ⇒ Object
- .exchange(eid) ⇒ Object
-
.exchange_confirm?(exchange_name) ⇒ Boolean
Default for channel confirm attr: false.
-
.exchange_durable?(exchange_name) ⇒ Boolean
Default for exchange durable attr: true.
- .exchange_name(consumer_key) ⇒ Object
- .exchange_type(exchange_name) ⇒ Object
- .find_tracker_on_channel(channel_id, delivery_tag, key) ⇒ Object
- .publish(e, message, options = {}) ⇒ Object
- .publisher_interceptors ⇒ Object
-
.queue_durable?(consumer_key) ⇒ Boolean
Default value: belonged exchange durable attr.
- .queue_exclusive(consumer_key) ⇒ Object
- .queue_manual_ack?(consumer_key) ⇒ Boolean
- .queue_name(consumer_key) ⇒ Object
- .queue_ok?(conn, name) ⇒ Boolean
- .queue_priority(consumer_key) ⇒ Object
-
.queue_retry_gap(consumer_key) ⇒ Object
By default retry-limit is 3000 If 0, means no gap and retry immediately To protect system, we can’t allow unlimited retry with zero-gap.
- .queue_retry_gap_set?(consumer_key) ⇒ Boolean
-
.queue_retry_limit(consumer_key) ⇒ Object
By default retry-limit is 3 If -1, means no limit.
- .queue_routing_key(consumer_key) ⇒ Object
- .queue_single_consumer?(consumer_key) ⇒ Boolean
- .queue_worker_param(consumer_key) ⇒ Object
- .queue_worker_param_format(consumer_key) ⇒ Object
- .queues_keys_for_subscribe ⇒ Object
- .rename_tracker(channel_id, old_delivery_tag, new_delivery_tag) ⇒ Object
- .ruby_class(consumer_key) ⇒ Object
- .search_exchange_of_routing_key!(routing_key) ⇒ Object
- .subscriber_queue_from_key(key) ⇒ Object
Class Method Details
.add_consumer_interceptor(interceptor) ⇒ Object
RABBITMQ interceptors
13 14 15 16 |
# File 'lib/istox/helpers/bunny_boot.rb', line 13 def add_consumer_interceptor(interceptor) @consumer_interceptors ||= [] @consumer_interceptors.push(interceptor) end |
.add_publisher_interceptor(interceptor) ⇒ Object
18 19 20 21 |
# File 'lib/istox/helpers/bunny_boot.rb', line 18 def add_publisher_interceptor(interceptor) @publisher_interceptors ||= [] @publisher_interceptors.push(interceptor) end |
.binding_exchange_id(id) ⇒ Object
73 74 75 76 77 |
# File 'lib/istox/helpers/bunny_boot.rb', line 73 def binding_exchange_id(id) data[:binding][id][:exchange] || :default rescue StandardError nil end |
.binding_routing_key(id) ⇒ Object
79 80 81 82 83 84 85 86 87 88 |
# File 'lib/istox/helpers/bunny_boot.rb', line 79 def binding_routing_key(id) queue = data[:binding][id][:queue] if data[:queues][queue].nil? queue else data[:queues][queue][:queue_name] end rescue StandardError nil end |
.cacheMsg(suffix, msg) ⇒ Object
363 364 365 |
# File 'lib/istox/helpers/bunny_boot.rb', line 363 def cacheMsg(suffix, msg) ::Istox::RedisBoot.sets("drop_msg_#{suffix}", JSON.dump(msg), 7) end |
.channel(conn, opts = {}) ⇒ Object
Create virtual channel on established connection Configure pool_size, prefetch, confirm mode according to opts
49 50 51 52 53 54 55 56 57 |
# File 'lib/istox/helpers/bunny_boot.rb', line 49 def channel(conn, opts = {}) ch = conn.create_channel(nil, opts[:pool_size] || 1) ch.prefetch(opts[:prefetch]) unless opts[:prefetch].nil? # Put channel in confirmation mode ch.confirm_select if opts[:confirm] ch end |
.channel_confirm?(ch) ⇒ Boolean
59 60 61 |
# File 'lib/istox/helpers/bunny_boot.rb', line 59 def channel_confirm?(ch) ch.using_publisher_confirmations? end |
.channel_pool_size(consumer_key) ⇒ Object
245 246 247 248 249 250 |
# File 'lib/istox/helpers/bunny_boot.rb', line 245 def channel_pool_size(consumer_key) channel = channel_config(consumer_key) return data['channel_pool_size'] || 1 if channel.nil? channel['channel_pool_size'] end |
.channel_prefetch(consumer_key) ⇒ Object
252 253 254 255 256 257 |
# File 'lib/istox/helpers/bunny_boot.rb', line 252 def channel_prefetch(consumer_key) channel = channel_config(consumer_key) return data['channel_prefetch'] || 1 if channel.nil? channel['channel_prefetch'] end |
.confirm_mode(eid) ⇒ Object
90 91 92 93 94 |
# File 'lib/istox/helpers/bunny_boot.rb', line 90 def confirm_mode(eid) data[:exchanges][eid][:confirm] || -1 rescue StandardError nil end |
.connection ⇒ Object
Create physical connection to RabbitMQ During failover of RabbitMQ cluster or temporary failure, there may be error and needs retry in loop
33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/istox/helpers/bunny_boot.rb', line 33 def connection conn = nil loop do conn = Bunny.new(data['connect'].symbolize_keys).tap(&:start) break rescue Bunny::TCPConnectionFailed, Bunny::TCPConnectionFailedForAllHosts => e log.error "Fails to create connection to RabbitMQ with err: #{e}" log.info 'Reconnect after 2s ...' sleep 2 end conn end |
.consumer_interceptors ⇒ Object
27 28 29 |
# File 'lib/istox/helpers/bunny_boot.rb', line 27 def consumer_interceptors @consumer_interceptors || [] end |
.create_tracker(channel_id, delivery_tag, eid, options, payload) ⇒ Object
319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/istox/helpers/bunny_boot.rb', line 319 def create_tracker(channel_id, delivery_tag, eid, , payload) log.debug "Create track: channel_id: #{channel_id}, delivery_tag: #{delivery_tag}" # combination of channel_id:delivery_tag can uniquely identify a msg # For each retry of msg, channel_id and delivery_tag is unchanged # But each retry, there is new delivery_tag that should be updated id = "#{channel_id}:#{delivery_tag}" ::Istox::RedisBoot.sets("#{id}:payload", JSON.dump(payload), 4) ::Istox::RedisBoot.sets("#{id}:eid", eid.to_s, 4) ::Istox::RedisBoot.sets("#{id}:options", JSON.dump(), 4) end |
.del_tracker_on_channel(channel_id) ⇒ Object
337 338 339 340 341 342 |
# File 'lib/istox/helpers/bunny_boot.rb', line 337 def del_tracker_on_channel(channel_id) trackers = find_trackers "#{channel_id}*" trackers.each do |k| ::Istox::RedisBoot.del k, 4 end end |
.eid(ex) ⇒ Object
352 353 354 355 356 357 |
# File 'lib/istox/helpers/bunny_boot.rb', line 352 def eid(ex) eid = ex.name eid = :default if eid.empty? eid end |
.etype(ex) ⇒ Object
359 360 361 |
# File 'lib/istox/helpers/bunny_boot.rb', line 359 def etype(ex) ex.type end |
.exchange(eid) ⇒ Object
63 64 65 66 67 68 69 70 71 |
# File 'lib/istox/helpers/bunny_boot.rb', line 63 def exchange(eid) type = data[:exchanges][eid][:type] name = eid settings = { durable: exchange_durable?(eid) } confirm = data[:exchanges][eid][:confirm] || -1 [type, name, settings, confirm] rescue StandardError nil end |
.exchange_confirm?(exchange_name) ⇒ Boolean
Default for channel confirm attr: false
124 125 126 |
# File 'lib/istox/helpers/bunny_boot.rb', line 124 def exchange_confirm?(exchange_name) exchange_config!(exchange_name)['confirm'] || false end |
.exchange_durable?(exchange_name) ⇒ Boolean
Default for exchange durable attr: true
115 116 117 118 119 120 121 |
# File 'lib/istox/helpers/bunny_boot.rb', line 115 def exchange_durable?(exchange_name) durable = exchange_config!(exchange_name)['durable'] durable = true if durable.nil? durable rescue StandardError => e raise e end |
.exchange_name(consumer_key) ⇒ Object
132 133 134 135 136 |
# File 'lib/istox/helpers/bunny_boot.rb', line 132 def exchange_name(consumer_key) queue_config_from_consumer_key!(consumer_key)['exchange'] rescue StandardError nil end |
.exchange_type(exchange_name) ⇒ Object
128 129 130 |
# File 'lib/istox/helpers/bunny_boot.rb', line 128 def exchange_type(exchange_name) exchange_config!(exchange_name)['type'] end |
.find_tracker_on_channel(channel_id, delivery_tag, key) ⇒ Object
331 332 333 334 335 |
# File 'lib/istox/helpers/bunny_boot.rb', line 331 def find_tracker_on_channel(channel_id, delivery_tag, key) pattern = "#{channel_id}:#{delivery_tag}:#{key}" keys = find_trackers pattern get_tracker(keys.first) end |
.publish(e, message, options = {}) ⇒ Object
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/istox/helpers/bunny_boot.rb', line 259 def publish(e, , = {}) eid = eid e # By default: # For persistence, if exchange is durable, persistent is enabled # For mandatory. if channel is confirmed mode, mandatory is enabled persistent = e.durable? persistent = true if eid(e) == :default mandatory = channel_confirm? e.channel = .clone # Set Mandatory & Persistent flag for non-DLX and non-manual msg unless %w[dlx manual].include? [:type] if [:routing_key].present? v1 = data['publish'][eid] v1 = v1[[:routing_key]] unless v1.nil? || v1[[:routing_key]].nil? persistent = v1['persistent'] unless v1.nil? || v1['persistent'].nil? mandatory = v1['mandatory'] unless v1.nil? || v1['mandatory'].nil? end end .merge!(persistent: persistent) .merge!(mandatory: mandatory) [:headers] = {} if [:headers].nil? [:headers][:sender] = Thread.current.object_id # message.merge!(locale: I18n.locale) publisher_interceptors.each do |interceptor| interceptor.call(, ) end = JSON.dump log.debug "Publish options are: #{}" log.debug "Publish message payload #{}" e.publish(, ) [:message_id] end |
.publisher_interceptors ⇒ Object
23 24 25 |
# File 'lib/istox/helpers/bunny_boot.rb', line 23 def publisher_interceptors @publisher_interceptors || [] end |
.queue_durable?(consumer_key) ⇒ Boolean
Default value: belonged exchange durable attr
197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/istox/helpers/bunny_boot.rb', line 197 def queue_durable?(consumer_key) durable = queue_config_from_consumer_key!(consumer_key)['durable'] if durable.nil? exchange_name = exchange_name consumer_key durable = exchange_durable? exchange_name unless exchange_name.nil? end durable rescue StandardError nil end |
.queue_exclusive(consumer_key) ⇒ Object
177 178 179 180 181 |
# File 'lib/istox/helpers/bunny_boot.rb', line 177 def queue_exclusive(consumer_key) queue_config_from_consumer_key!(consumer_key)['exclusive'] || false rescue StandardError nil end |
.queue_manual_ack?(consumer_key) ⇒ Boolean
209 210 211 212 213 214 |
# File 'lib/istox/helpers/bunny_boot.rb', line 209 def queue_manual_ack?(consumer_key) manual_ack = queue_config_from_consumer_key!(consumer_key)['manual_ack'] return true if manual_ack.nil? manual_ack end |
.queue_name(consumer_key) ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/istox/helpers/bunny_boot.rb', line 138 def queue_name(consumer_key) name = queue_config_from_consumer_key!(consumer_key)['queue_name'] return name if name.nil? prefix = queue_config_from_consumer_key!(consumer_key, true)['queue_prefix'] suffix = queue_config_from_consumer_key!(consumer_key, true)['queue_suffix'] delimiter = queue_config_from_consumer_key!(consumer_key)['queue_delimiter'] || '' unless prefix.nil? && suffix.nil? name = "#{prefix}#{delimiter}#{name}" unless prefix.nil? name = "#{name}#{delimiter}#{suffix}" unless suffix.nil? name rescue StandardError nil end |
.queue_ok?(conn, name) ⇒ Boolean
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 |
# File 'lib/istox/helpers/bunny_boot.rb', line 298 def queue_ok?(conn, name) ch = conn.create_channel begin q = ch.queue(name, passive: true) count = q.consumer_count log.debug "Consumer count number is #{count}" if count.zero? log.debug "queue #{name} has no consumer, ok!" true else log.debug "queue #{name} has consumer, nok!" false end rescue Bunny::NotFound => e log.debug "Bunny::NotFound, #{e}" false ensure ch.close if ch.open? end end |
.queue_priority(consumer_key) ⇒ Object
153 154 155 156 157 |
# File 'lib/istox/helpers/bunny_boot.rb', line 153 def queue_priority(consumer_key) queue_config_from_consumer_key!(consumer_key)['priority'] rescue StandardError nil end |
.queue_retry_gap(consumer_key) ⇒ Object
By default retry-limit is 3000 If 0, means no gap and retry immediately To protect system, we can’t allow unlimited retry with zero-gap
231 232 233 234 235 236 237 238 239 |
# File 'lib/istox/helpers/bunny_boot.rb', line 231 def queue_retry_gap(consumer_key) retry_gap = queue_config_from_consumer_key!(consumer_key)['retry_gap'] if retry_gap.nil? || (retry_gap == 0 && queue_retry_limit(consumer_key) == -1) 3000 else retry_gap.to_i end end |
.queue_retry_gap_set?(consumer_key) ⇒ Boolean
241 242 243 |
# File 'lib/istox/helpers/bunny_boot.rb', line 241 def queue_retry_gap_set?(consumer_key) queue_config_from_consumer_key!(consumer_key)['retry_gap'].present? end |
.queue_retry_limit(consumer_key) ⇒ Object
By default retry-limit is 3 If -1, means no limit
218 219 220 221 222 223 224 225 226 |
# File 'lib/istox/helpers/bunny_boot.rb', line 218 def queue_retry_limit(consumer_key) retry_limit = queue_config_from_consumer_key!(consumer_key)['retry_limit'] if retry_limit.nil? 5 else retry_limit.to_i end end |
.queue_routing_key(consumer_key) ⇒ Object
171 172 173 174 175 |
# File 'lib/istox/helpers/bunny_boot.rb', line 171 def queue_routing_key(consumer_key) queue_config_from_consumer_key!(consumer_key)['routing_key'] || (queue_name consumer_key) rescue StandardError nil end |
.queue_single_consumer?(consumer_key) ⇒ Boolean
189 190 191 192 193 194 |
# File 'lib/istox/helpers/bunny_boot.rb', line 189 def queue_single_consumer?(consumer_key) single_consumer = queue_config_from_consumer_key!(consumer_key)['single_consumer'] return false if single_consumer.nil? single_consumer end |
.queue_worker_param(consumer_key) ⇒ Object
159 160 161 162 163 |
# File 'lib/istox/helpers/bunny_boot.rb', line 159 def queue_worker_param(consumer_key) queue_config_from_consumer_key!(consumer_key, true)['worker_param'] rescue StandardError nil end |
.queue_worker_param_format(consumer_key) ⇒ Object
165 166 167 168 169 |
# File 'lib/istox/helpers/bunny_boot.rb', line 165 def queue_worker_param_format(consumer_key) queue_config_from_consumer_key!(consumer_key)['worker_param_format'] || 'open_struct' rescue StandardError nil end |
.queues_keys_for_subscribe ⇒ Object
96 97 98 |
# File 'lib/istox/helpers/bunny_boot.rb', line 96 def queues_keys_for_subscribe data['queues'].keys end |
.rename_tracker(channel_id, old_delivery_tag, new_delivery_tag) ⇒ Object
344 345 346 347 348 349 350 |
# File 'lib/istox/helpers/bunny_boot.rb', line 344 def rename_tracker(channel_id, old_delivery_tag, new_delivery_tag) old_id = "#{channel_id}:#{old_delivery_tag}" new_id = "#{channel_id}:#{new_delivery_tag}" ::Istox::RedisBoot.rename("#{old_id}:payload", "#{new_id}:payload", 4) ::Istox::RedisBoot.rename("#{old_id}:eid", "#{new_id}:eid", 4) ::Istox::RedisBoot.rename("#{old_id}:options", "#{new_id}:options", 4) end |
.ruby_class(consumer_key) ⇒ Object
183 184 185 186 187 |
# File 'lib/istox/helpers/bunny_boot.rb', line 183 def ruby_class(consumer_key) queue_config_from_consumer_key!(consumer_key)['ruby_class'] rescue StandardError nil end |
.search_exchange_of_routing_key!(routing_key) ⇒ Object
104 105 106 107 108 109 110 111 112 |
# File 'lib/istox/helpers/bunny_boot.rb', line 104 def search_exchange_of_routing_key!(routing_key) data['publish'].each do |k, v| v.each do |k2, _v2| return k if k2 == routing_key end end raise "Cannot find exchange of routing key #{routing_key}, have you forgotten to define it in exchange section in amqp.yml" end |
.subscriber_queue_from_key(key) ⇒ Object
100 101 102 |
# File 'lib/istox/helpers/bunny_boot.rb', line 100 def subscriber_queue_from_key(key) data['queues'][key] end |