Class: Upperkut::Strategies::PriorityQueue
- Includes:
- Util
- Defined in:
- lib/upperkut/strategies/priority_queue.rb
Overview
Public: Queue that prevent a single tenant from taking over.
Constant Summary collapse
- ONE_DAY_IN_SECONDS =
86400
- ENQUEUE_ITEM =
Logic as follows:
We keep the last score used for each tenant key. One tenant_key is
an tenant unique id. To calculate the next_score we use max(current_tenant_score, current_global_score) + increment we store the queue in a sorted set using the next_score as ordering key if one tenant sends lots of messages, this tenant ends up with lots of messages in the queue spaced by increment if another tenant then sends a message, since it previous_tenant_score is lower than the first tenant, it will be inserted before it in the queue.
In other words, the idea of this queue is to not allowing an tenant
that sends a lot of messages to dominate processing and give a chance for tenants that sends few messages to have a fair share of processing time.
%( local increment = 1 local checkpoint_key = KEYS[1] local counter_key = KEYS[2] local score_key = KEYS[3] local queue_key = KEYS[4] local current_checkpoint = tonumber(redis.call("GET", checkpoint_key)) or 0 local current_counter = tonumber(redis.call("INCR", counter_key)) local current_score = tonumber(redis.call("GET", score_key)) or 0 local next_score = nil if current_score >= current_checkpoint then next_score = current_score + increment else next_score = current_checkpoint + increment end redis.call("SETEX", score_key, #{ONE_DAY_IN_SECONDS}, next_score) redis.call("ZADD", queue_key, next_score + tonumber('0.' .. current_counter), ARGV[1]) return next_score ).freeze
- DEQUEUE_ITEM =
Uses ZPOP* functions available only on redis 5.0.0+
%( local checkpoint_key = KEYS[1] local queue_key = KEYS[2] local batch_size = ARGV[1] local popped_items = redis.call("ZPOPMIN", queue_key, batch_size) local items = {} local last_score = 0 for i, v in ipairs(popped_items) do if i % 2 == 1 then table.insert(items, v) else last_score = v end end redis.call("SETEX", checkpoint_key, 86400, last_score) return items ).freeze
Instance Method Summary collapse
- #ack(_items) ⇒ Object
-
#clear ⇒ Object
Public: Clear all data related to the strategy.
-
#fetch_items ⇒ Object
Public: Retrieve events from Strategy.
-
#initialize(worker, options) ⇒ PriorityQueue
constructor
A new instance of PriorityQueue.
-
#metrics ⇒ Object
Public: Consolidated strategy metrics.
- #nack(items) ⇒ Object
-
#process? ⇒ Boolean
Public: Tells when to execute the event processing, when this condition is met so the events are dispatched to the worker.
-
#push_items(items = []) ⇒ Object
Public: Ingests the event into strategy.
Methods included from Util
#decode_json_items, #encode_json_items, #normalize_items, #retry_block, #to_underscore
Constructor Details
#initialize(worker, options) ⇒ PriorityQueue
Returns a new instance of PriorityQueue.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 72 def initialize(worker, ) @worker = worker @options = @priority_key = .fetch(:priority_key) @redis_options = .fetch(:redis, {}) @max_wait = .fetch( :max_wait, Integer(ENV['UPPERKUT_MAX_WAIT'] || 20) ) @batch_size = .fetch( :batch_size, Integer(ENV['UPPERKUT_BATCH_SIZE'] || 1000) ) @waiting_time = 0 raise ArgumentError, 'Invalid priority_key. ' \ 'Must be a lambda' unless @priority_key.respond_to?(:call) end |
Instance Method Details
#ack(_items) ⇒ Object
142 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 142 def ack(_items); end |
#clear ⇒ Object
Public: Clear all data related to the strategy.
138 139 140 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 138 def clear redis { |conn| conn.del(queue_key) } end |
#fetch_items ⇒ Object
Public: Retrieve events from Strategy.
Returns an Array containing events as hash.
125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 125 def fetch_items batch_size = [@batch_size, size].min items = redis do |conn| conn.eval(DEQUEUE_ITEM, keys: [checkpoint_key, queue_key], argv: [batch_size]) end decode_json_items(items) end |
#metrics ⇒ Object
Public: Consolidated strategy metrics.
Returns hash containing metric name and values.
164 165 166 167 168 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 164 def metrics { 'size' => size } end |
#nack(items) ⇒ Object
144 145 146 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 144 def nack(items) push_items(items) end |
#process? ⇒ Boolean
Public: Tells when to execute the event processing, when this condition is met so the events are dispatched to the worker.
151 152 153 154 155 156 157 158 159 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 151 def process? if fulfill_condition?(size) @waiting_time = 0 return true end @waiting_time += @worker.setup.polling_interval false end |
#push_items(items = []) ⇒ Object
Public: Ingests the event into strategy.
items - The Array of items do be inserted.
Returns true when success, raise when error.
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/upperkut/strategies/priority_queue.rb', line 99 def push_items(items = []) items = normalize_items(items) return false if items.empty? redis do |conn| items.each do |item| priority_key = @priority_key.call(item) score_key = "#{queue_key}:#{priority_key}:score" keys = [checkpoint_key, counter_key, score_key, queue_key] conn.eval(ENQUEUE_ITEM, keys: keys, argv: [encode_json_items(item)]) end end true end |