Class: Upperkut::Strategies::PriorityQueue

Inherits:
Base
  • Object
show all
Includes:
Util
Defined in:
lib/upperkut/strategies/priority_queue.rb

Overview

Public: Queue that prevent a single tenant from taking over.

Constant Summary collapse

ONE_DAY_IN_SECONDS =
86400
ENQUEUE_ITEM =

Logic as follows:

We keep the last score used for each tenant key. One tenant_key is

an tenant unique id. To calculate the next_score we use
max(current_tenant_score, current_global_score) + increment we store
the queue in a sorted set using the next_score as ordering key if one
tenant sends lots of messages, this tenant ends up with lots of
messages in the queue spaced by increment if another tenant then
sends a message, since it previous_tenant_score is lower than the
first tenant, it will be inserted before it in the queue.

In other words, the idea of this queue is to not allowing an tenant

that sends a lot of messages to dominate processing and give a chance
for tenants that sends few messages to have a fair share of
processing time.
%(
  local increment = 1
  local checkpoint_key = KEYS[1]
  local counter_key = KEYS[2]
  local score_key = KEYS[3]
  local queue_key = KEYS[4]
  local current_checkpoint = tonumber(redis.call("GET", checkpoint_key)) or 0
  local current_counter = tonumber(redis.call("INCR", counter_key))
  local current_score = tonumber(redis.call("GET", score_key)) or 0
  local next_score = nil

  if current_score >= current_checkpoint then
    next_score = current_score + increment
  else
    next_score = current_checkpoint + increment
  end

  redis.call("SETEX", score_key, #{ONE_DAY_IN_SECONDS}, next_score)
  redis.call("ZADD", queue_key, next_score + tonumber('0.' .. current_counter), ARGV[1])

  return next_score
).freeze
DEQUEUE_ITEM =

Uses ZPOP* functions available only on redis 5.0.0+

%(
  local checkpoint_key = KEYS[1]
  local queue_key = KEYS[2]
  local batch_size = ARGV[1]
  local popped_items = redis.call("ZPOPMIN", queue_key, batch_size)
  local items = {}
  local last_score = 0

  for i, v in ipairs(popped_items) do
    if i % 2 == 1 then
      table.insert(items, v)
    else
      last_score = v
    end
  end

  redis.call("SETEX", checkpoint_key, 86400, last_score)
  return items
).freeze

Instance Method Summary collapse

Methods included from Util

#decode_json_items, #encode_json_items, #normalize_items, #retry_block, #to_underscore

Constructor Details

#initialize(worker, options) ⇒ PriorityQueue

Returns a new instance of PriorityQueue.

Raises:

  • (ArgumentError)


72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/upperkut/strategies/priority_queue.rb', line 72

def initialize(worker, options)
  @worker = worker
  @options = options
  @priority_key = options.fetch(:priority_key)
  @redis_options = options.fetch(:redis, {})

  @max_wait = options.fetch(
    :max_wait,
    Integer(ENV['UPPERKUT_MAX_WAIT'] || 20)
  )

  @batch_size = options.fetch(
    :batch_size,
    Integer(ENV['UPPERKUT_BATCH_SIZE'] || 1000)
  )

  @waiting_time = 0

  raise ArgumentError, 'Invalid priority_key. ' \
    'Must be a lambda' unless @priority_key.respond_to?(:call)
end

Instance Method Details

#ack(_items) ⇒ Object



142
# File 'lib/upperkut/strategies/priority_queue.rb', line 142

def ack(_items); end

#clearObject

Public: Clear all data related to the strategy.



138
139
140
# File 'lib/upperkut/strategies/priority_queue.rb', line 138

def clear
  redis { |conn| conn.del(queue_key) }
end

#fetch_itemsObject

Public: Retrieve events from Strategy.

Returns an Array containing events as hash.



125
126
127
128
129
130
131
132
133
134
135
# File 'lib/upperkut/strategies/priority_queue.rb', line 125

def fetch_items
  batch_size = [@batch_size, size].min

  items = redis do |conn|
    conn.eval(DEQUEUE_ITEM,
              keys: [checkpoint_key, queue_key],
              argv: [batch_size])
  end

  decode_json_items(items)
end

#metricsObject

Public: Consolidated strategy metrics.

Returns hash containing metric name and values.



164
165
166
167
168
# File 'lib/upperkut/strategies/priority_queue.rb', line 164

def metrics
  {
    'size' => size
  }
end

#nack(items) ⇒ Object



144
145
146
# File 'lib/upperkut/strategies/priority_queue.rb', line 144

def nack(items)
  push_items(items)
end

#process?Boolean

Public: Tells when to execute the event processing, when this condition is met so the events are dispatched to the worker.

Returns:

  • (Boolean)


151
152
153
154
155
156
157
158
159
# File 'lib/upperkut/strategies/priority_queue.rb', line 151

def process?
  if fulfill_condition?(size)
    @waiting_time = 0
    return true
  end

  @waiting_time += @worker.setup.polling_interval
  false
end

#push_items(items = []) ⇒ Object

Public: Ingests the event into strategy.

items - The Array of items do be inserted.

Returns true when success, raise when error.



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/upperkut/strategies/priority_queue.rb', line 99

def push_items(items = [])
  items = normalize_items(items)
  return false if items.empty?

  redis do |conn|
    items.each do |item|
      priority_key = @priority_key.call(item)
      score_key = "#{queue_key}:#{priority_key}:score"

      keys = [checkpoint_key,
              counter_key,
              score_key,
              queue_key]

      conn.eval(ENQUEUE_ITEM,
                keys: keys,
                argv: [encode_json_items(item)])
    end
  end

  true
end