Class: Redis::Ick
- Inherits:
-
Object
- Object
- Redis::Ick
- Defined in:
- lib/redis/ick.rb,
lib/redis/ick/version.rb
Overview
Accessor for Ick data structures in Redis.
Defined Under Namespace
Classes: FutureContinuation
Constant Summary collapse
- Skip0ThenFloatifyPairs =
Postprocessing done on the LUA_ICKEXCHANGE results for both ickreserve and ickexchange.
results are num_committed followed by N message-and-score pairs.
We do results to skip the first element, num_committed.
On the rest, we floatify the scores to convert from Redis number-as-string limitation to Ruby Floats.
This is similar to to Redis::FloatifyPairs:
github.com/redis/redis-rb/blob/master/lib/redis.rb#L2887-L2896
lambda do |results| results[1..-1].each_slice(2).map do |m_and_s| [ m_and_s[0], ::Redis::Ick._floatify(m_and_s[1]) ] end end
- LUA_ICK_PREFIX =
LUA_ICK_PREFIX
A snippet of Lua code which is common to all the Ick scripts.
For convenience and to avoid repeating code, we set up some computed key names.
For safety, we check that the ick_ver, ick_pset, and ick_cset either do not exist or exit with the correct types and values to be identifiable as an Ick.
All scripts in the LUA_ICK series expect only one KEYS, the root key of the Ick data structure. We expect a version flag as a string at this key. Keys for other data are computed from KEYS in such a way as to guarantee they all hash to the same slot.
%{ local ick_key = KEYS[1] local ick_ver = redis.call('GET',ick_key) local ick_pset_key = KEYS[2] local ick_cset_key = KEYS[3] local ick_ver_type = redis.call('TYPE',ick_key).ok local ick_pset_type = redis.call('TYPE',ick_pset_key).ok local ick_cset_type = redis.call('TYPE',ick_cset_key).ok if (false ~= ick_ver and 'ick.v1' ~= ick_ver) then return redis.error_reply('unrecognized ick version ' .. ick_ver) end if ('none' ~= ick_ver_type and 'string' ~= ick_ver_type) then return redis.error_reply('ick defense: expected string at ' .. ick_ver_key .. ', found ' .. ick_ver_type) end if ('none' ~= ick_pset_type and 'zset' ~= ick_pset_type) then return redis.error_reply('ick defense: expected string at ' .. ick_pset_key .. ', found ' .. ick_pset_type) end if ('none' ~= ick_cset_type and 'zset' ~= ick_cset_type) then return redis.error_reply('ick defense: expected string at ' .. ick_cset_key .. ', found ' .. ick_cset_type) end if ('none' == ick_ver_type) then if ('none' ~= ick_pset_type) then return redis.error_reply('ick defense: no ver at ' .. ick_ver_key .. ', but found pset at ' .. ick_pset_key) end if ('none' ~= ick_cset_type) then return redis.error_reply('ick defense: no ver at ' .. ick_ver_key .. ', but found cset at ' .. ick_cset_key) end end }.freeze
- LUA_ICKSTATS =
LUA_ICKSTATS
KEYS, or nil if none.
Note: At redis.io/commands/eval, the “Lua to Redis conversion table” stats:
Lua number -> Redis integer reply (the number is converted into an integer) ...If you want to return a float from Lua you should return it as a string.
We follow this recommendation in our Lua below where we convert our numeric responses to strings with “tostring(tonumber(n))”.
(LUA_ICK_PREFIX + %{ if (false == ick_ver) then return nil end local ick_pset_size = redis.call('ZCARD',ick_pset_key) local ick_cset_size = redis.call('ZCARD',ick_cset_key) local ick_stats = { 'key', ick_key, 'pset_key', ick_pset_key, 'cset_key', ick_cset_key, 'ver', ick_ver, 'cset_size', ick_cset_size, 'pset_size', ick_pset_size, 'total_size', ick_cset_size + ick_pset_size, } local pset_min = nil local pset_max = nil if ick_pset_size > 0 then pset_min = redis.call('ZRANGE',ick_pset_key, 0, 0,'WITHSCORES')[2] table.insert(ick_stats, 'pset_min') table.insert(ick_stats, tostring(tonumber(pset_min))) pset_max = redis.call('ZRANGE',ick_pset_key,-1,-1,'WITHSCORES')[2] table.insert(ick_stats, 'pset_max') table.insert(ick_stats, tostring(tonumber(pset_max))) end local cset_min = nil local cset_max = nil if ick_cset_size > 0 then cset_min = redis.call('ZRANGE',ick_cset_key, 0, 0,'WITHSCORES')[2] table.insert(ick_stats, 'cset_min') table.insert(ick_stats, tostring(tonumber(cset_min))) cset_max = redis.call('ZRANGE',ick_cset_key,-1,-1,'WITHSCORES')[2] table.insert(ick_stats, 'cset_max') table.insert(ick_stats, tostring(tonumber(cset_max))) end local total_min = nil if pset_min and cset_min then total_min = math.min(cset_min,pset_min) elseif pset_min then total_min = pset_min elseif cset_min then total_min = cset_min end if total_min then table.insert(ick_stats, 'total_min') table.insert(ick_stats, tostring(tonumber(total_min))) end local total_max = nil if pset_max and cset_max then total_max = math.max(cset_max,pset_max) elseif pset_max then total_max = pset_max elseif cset_max then total_max = cset_max end if total_max then table.insert(ick_stats, 'total_max') table.insert(ick_stats, tostring(tonumber(total_max))) end return ick_stats }).freeze
- LUA_ICKADD =
LUA_ICKADD
Adds members to the cset as per ZADD. Where a member is re-written, we always take the lowest score.
Thus, scores are only allowed to move downward. changes to score.
Creates the Ick if necessary.
(LUA_ICK_PREFIX + %{ local num_args = table.getn(ARGV) if 1 == (num_args % 2) then return redis.error_reply("odd number of arguments for 'ickadd' command") end local num_new = 0 local num_changed = 0 for i = 1,num_args,2 do local score = tonumber(ARGV[i]) local member = ARGV[i+1] local old_score = redis.call('ZSCORE',ick_pset_key,member) if false == old_score then redis.call('ZADD',ick_pset_key,score,member) num_new = num_new + 1 elseif score < tonumber(old_score) then redis.call('ZADD',ick_pset_key,score,member) num_changed = num_changed + 1 end end redis.call('SETNX', ick_key, 'ick.v1') return { num_new, num_changed } }).freeze
- LUA_ICKEXCHANGE =
LUA_ICKEXCHANGE: commit then reserve
Commit Function
Removes specified members in ARGV from the pset, then tops up the cset to up to size ARGV by shifting the lowest-scored members over from the pset.
The cset might already be full, in which case we may shift fewer than ARGV elements.
Reserve Function
Tops up the cset to up to size ARGV by shifting the lowest-scored members over from the pset.
The cset might already be full, in which case we may shift fewer than ARGV elements.
The same score-folding happens as per ICKADD. Thus where there are duplicate messages, we may remove more members from the pset than we add to the cset.
cset and to be returned
cset by the commit function followed by up to ARGV pairs
- member,score,…
-
from the reserve funciton.
Note: This Lua code calls unpack(ARGV,i,j) in limited-size slices, no larger than 7990, to avoid a “too many results to unpack” failure which has been observed when unpacking tables as small as 8000.
(LUA_ICK_PREFIX + %{ local reserve_size = tonumber(ARGV[1]) local backwash = ARGV[2] local argc = table.getn(ARGV) local num_committed = 0 local unpack_limit = 7990 for i = 3,argc,unpack_limit do local max = math.min(i+unpack_limit,argc) local num_zrem = redis.call('ZREM',ick_cset_key,unpack(ARGV,i,max)) num_committed = num_committed + num_zrem end local ick_fold = function(key_from,key_to,max_size_key_to) while true do local size_key_to = redis.call('ZCARD',key_to) local num = math.min( max_size_key_to - size_key_to, unpack_limit / 2 -- room for both scores and members ) if num < 1 then break end local head_from = redis.call('ZRANGE',key_from,0,num-1,'WITHSCORES') local head_size = table.getn(head_from) if 0 == head_size then break end local to_zadd = {} -- both scores and members local to_zrem = {} -- members only for i = 1,head_size,2 do local member = head_from[i] local score_from = tonumber(head_from[i+1]) local score_to = redis.call('ZSCORE',key_to,member) if false == score_to or score_from < tonumber(score_to) then to_zadd[#to_zadd+1] = score_from to_zadd[#to_zadd+1] = member end to_zrem[#to_zrem+1] = member end redis.call('ZREM',key_from,unpack(to_zrem)) if 0 < table.getn(to_zadd) then redis.call('ZADD',key_to,unpack(to_zadd)) end end end if 'backwash' == backwash then -- -- Fold everything in the cset back into the pset. -- local pset_size = redis.call('ZCARD',ick_pset_key) or 0 local cset_size = redis.call('ZCARD',ick_cset_key) or 0 ick_fold(ick_cset_key,ick_pset_key,pset_size+cset_size) end -- -- Fold enough from the pset to the cset to grow the cset -- to at most reserve_size members. -- ick_fold(ick_pset_key,ick_cset_key,reserve_size) -- -- Make sure ick_key exists per specification. -- redis.call('SETNX', ick_key, 'ick.v1') -- -- Package up return results, which may be smaller than the cset. -- local result = { num_committed } if reserve_size > 0 then local max = reserve_size - 1 local cset_batch = redis.call('ZRANGE',ick_cset_key,0,max,'WITHSCORES') for _i,v in ipairs(cset_batch) do table.insert(result,v) end end return result }).freeze
- VERSION =
'0.1.4'.freeze
Instance Attribute Summary collapse
-
#redis ⇒ Object
Returns the value of attribute redis.
-
#statsd ⇒ Object
Returns the value of attribute statsd.
Class Method Summary collapse
-
._floatify(str) ⇒ Object
Converts a string str into a Float, and recognizes ‘inf’, ‘-inf’, etc.
Instance Method Summary collapse
-
#_eval(lua, ick_key, *args) ⇒ Object
Runs the specified lua in the redis against the specifified Ick.
-
#_postprocess(raw_results, callback) ⇒ Object
Calls back to block with the results.
-
#_statsd_increment(metric) ⇒ Object
Reports a single count on the requested metric to statsd (if any).
-
#_statsd_time(metric) ⇒ Object
Executes the block (if any) and reports its timing in milliseconds on the requested metric to statsd (if any).
-
#_statsd_timing(metric, time) ⇒ Object
Reports the specified timing on the requested metric to statsd (if any).
-
#ickadd(ick_key, *score_member_pairs) ⇒ Object
Adds all the specified members with the specified scores to the Ick stored at key.
-
#ickcommit(ick_key, *members) ⇒ Object
Removes the indicated members from the producer set, if present.
-
#ickdel(ick_key, unlink: false) ⇒ Object
Removes all data associated with the Ick in Redis at key.
-
#ickexchange(ick_key, reserve_size, *commit_members, backwash: false) ⇒ Object
ickexchange combines several functions in one Redis round-trip.
-
#ickreserve(ick_key, max_size = 0, backwash: false) ⇒ Object
Tops up the consumer set up to max_size by shifting the lowest-scored elements from the producer set into the consumer set until the consumer set cardinality reaches max_size or the producer set is exhausted.
-
#ickstats(ick_key) ⇒ Object
Fetches stats.
-
#ickunlink(ick_key) ⇒ Object
Removes all data associated with the Ick in Redis at key.
-
#initialize(redis, statsd: nil) ⇒ Ick
constructor
Creates an Ick accessor.
Constructor Details
#initialize(redis, statsd: nil) ⇒ Ick
Creates an Ick accessor.
to :increment and :timing.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/redis/ick.rb', line 19 def initialize(redis, statsd: nil) if !redis.is_a?(Redis) raise ArgumentError, "not a Redis: #{redis}" end if statsd if !statsd.respond_to?(:increment) raise ArgumentError, 'no statsd.increment' end if !statsd.respond_to?(:timing) raise ArgumentError, 'no statsd.timeing' end if !statsd.respond_to?(:time) raise ArgumentError, 'no statsd.time' end end @redis = redis @statsd = statsd end |
Instance Attribute Details
#redis ⇒ Object
Returns the value of attribute redis.
38 39 40 |
# File 'lib/redis/ick.rb', line 38 def redis @redis end |
#statsd ⇒ Object
Returns the value of attribute statsd.
39 40 41 |
# File 'lib/redis/ick.rb', line 39 def statsd @statsd end |
Class Method Details
._floatify(str) ⇒ Object
Converts a string str into a Float, and recognizes ‘inf’, ‘-inf’, etc.
So we can be certain of compatibility, this was stolen with tweaks from:
https://github.com/redis/redis-rb/blob/master/lib/redis.rb#L2876-L2885
479 480 481 482 483 484 485 486 |
# File 'lib/redis/ick.rb', line 479 def self._floatify(str) raise ArgumentError, "not String: #{str}" if !str.is_a?(String) if (inf = str.match(/^(-)?inf/i)) (inf[1] ? -1.0 : 1.0) / 0.0 else Float(str) end end |
Instance Method Details
#_eval(lua, ick_key, *args) ⇒ Object
Runs the specified lua in the redis against the specifified Ick.
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 |
# File 'lib/redis/ick.rb', line 490 def _eval(lua,ick_key,*args) if !lua.is_a?(String) raise ArgumentError, "bogus non-String lua #{lua}" end if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end ick_pset_key = "#{ick_key}/ick/{#{ick_key}}/pset" ick_cset_key = "#{ick_key}/ick/{#{ick_key}}/cset" Redis::ScriptManager.eval_gently( redis, lua, [ick_key,ick_pset_key,ick_cset_key], args ) end |
#_postprocess(raw_results, callback) ⇒ Object
Calls back to block with the results.
If raw_results is a Redis::Future, callback will be deferred until the future is expanded.
Otherwise, callback will happen immediately.
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 |
# File 'lib/redis/ick.rb', line 403 def _postprocess(raw_results,callback) if raw_results.is_a?(Redis::Future) # # Redis::Future have a built-in mechanism for calling a # transformation on the raw results. # # Here, we monkey-patch not the Redis::Future class, but just # this one raw_results object. We give ourselves a door to # set the post-processing transformation. # # The transformation will be called only once when the real # results are materialized. # class << raw_results def transformation=(transformation) raise "transformation collision" if @transformation @transformation = transformation end end raw_results.transformation = callback raw_results else # # If not Redis::Future, we invoke the callback immediately. # callback.call(raw_results) end end |
#_statsd_increment(metric) ⇒ Object
Reports a single count on the requested metric to statsd (if any).
45 46 47 |
# File 'lib/redis/ick.rb', line 45 def _statsd_increment(metric) statsd.increment(metric) if statsd end |
#_statsd_time(metric) ⇒ Object
Executes the block (if any) and reports its timing in milliseconds on the requested metric to statsd (if any).
65 66 67 68 69 70 71 72 73 |
# File 'lib/redis/ick.rb', line 65 def _statsd_time(metric) if statsd statsd.time(metric) do block_given? ? yield : nil end else block_given? ? yield : nil end end |
#_statsd_timing(metric, time) ⇒ Object
Reports the specified timing on the requested metric to statsd (if any).
54 55 56 |
# File 'lib/redis/ick.rb', line 54 def _statsd_timing(metric,time) statsd.timing(metric,time) if statsd end |
#ickadd(ick_key, *score_member_pairs) ⇒ Object
Adds all the specified members with the specified scores to the Ick stored at key.
Entries are stored in order by score. Lower-scored entries will pop out in reserve before higher-scored entries. Re-adding an entry which already exists in the producer set with a new score results in the entry having the lowest of the old and new scores.
Similar to redis.io/commands/zadd with a modified NX option, operating on the producer set.
Usage:
ick.ickadd(ick_key,score,member[,score,member]*)
Suggested usage is for scores to be a Unix timestamp indicating when something became dirty.
changed scores.
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/redis/ick.rb', line 197 def ickadd(ick_key,*score_member_pairs) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end if score_member_pairs.size.odd? raise ArgumentError, "bogus odd-numbered #{score_member_pairs}" end score_member_pairs.each_slice(2) do |slice| score, member = slice if ! score.is_a? Numeric raise ArgumentError, "bogus non-Numeric score #{score}" end if ! member.is_a? String raise ArgumentError, "bogus non-String member #{member}" end end _statsd_increment('profile.ick.ickadd.calls') _statsd_timing('profile.ick.ickadd.pairs',score_member_pairs.size / 2) _statsd_time('profile.ick.time.ickadd') do _eval(LUA_ICKADD,ick_key,*score_member_pairs) end end |
#ickcommit(ick_key, *members) ⇒ Object
Removes the indicated members from the producer set, if present.
Similar to ZREM ick_key [member]*, per redis.io/commands/zrem, operating on the consumer set only.
Usage:
ick.ickcommit(ick_key,memberA,memberB,...)
Committed elements are meant to represent consumer work-completed.
producer set, not including non existing members.
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 |
# File 'lib/redis/ick.rb', line 298 def ickcommit(ick_key,*members) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end _statsd_increment('profile.ick.ickcommit.calls') _statsd_timing('profile.ick.ickcommit.members',members.size) raw_results = nil _statsd_time('profile.ick.time.ickcommit') do raw_results = _eval( LUA_ICKEXCHANGE, ick_key, 0, false, # backwash not relevant in ickcommit *members ) end # # raw_results are num_committed followed by 0 message-and-score # pairs. # # We just capture the num_committed. # _postprocess(raw_results,lambda { |results| results[0] }) end |
#ickdel(ick_key, unlink: false) ⇒ Object
Removes all data associated with the Ick in Redis at key.
Similar to DEL key, redis.io/commands/del, but may delete multiple keys which together implement the Ick data structure.
be >= 1 if an Ick existed at key.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/redis/ick.rb', line 88 def ickdel(ick_key,unlink: false) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end stats_prefix = unlink ? 'profile.ick.ickunlink' : 'profile.ick.ickdel' redis_cmd = unlink ? 'UNLINK' : 'DEL' _statsd_increment("#{stats_prefix}.calls") _statsd_time("#{stats_prefix}.time") do _eval( LUA_ICK_PREFIX + "return redis.call('#{redis_cmd}',ick_key,ick_pset_key,ick_cset_key)", ick_key ) end end |
#ickexchange(ick_key, reserve_size, *commit_members, backwash: false) ⇒ Object
ickexchange combines several functions in one Redis round-trip.
-
As ickcommit, removes consumed members from the consumer set.
-
As ickreserve, tops up the consumer set from the producer and returns the requested new consumer members, if any.
with high scores are swapped out for pset members with lower scores. Otherwise cset members remain in the cset until committed regardless of how low scores in the pset might be.
Redis.current.zrange() withscores: [ message, score ] representing the lowest-scored elements from the producer set after the commit and reserve operations.
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# File 'lib/redis/ick.rb', line 346 def ickexchange(ick_key,reserve_size,*commit_members,backwash: false) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end if !reserve_size.is_a?(Integer) raise ArgumentError, "bogus non-Integer reserve_size #{reserve_size}" end if reserve_size < 0 raise ArgumentError, "bogus negative reserve_size #{reserve_size}" end _statsd_increment('profile.ick.ickexchange.calls') _statsd_timing('profile.ick.ickexchange.reserve_size',reserve_size) _statsd_timing( 'profile.ick.ickexchange.commit_members', commit_members.size ) raw_results = nil _statsd_time('profile.ick.time.ickexchange') do raw_results = _eval( LUA_ICKEXCHANGE, ick_key, reserve_size, backwash ? 'backwash' : false, commit_members ) end _postprocess(raw_results,Skip0ThenFloatifyPairs) end |
#ickreserve(ick_key, max_size = 0, backwash: false) ⇒ Object
Tops up the consumer set up to max_size by shifting the lowest-scored elements from the producer set into the consumer set until the consumer set cardinality reaches max_size or the producer set is exhausted.
The reserved elements are meant to represent consumer work-in-progress. If they are not committed, they will be returned again in future calls to ickreserve.
Note that the Lua for ick is irritating like so:
- you add in the pattern [ score_number, member_string, ... ]
- you retrieve in the pattern [ member_string, score_string, ... ]
Native ZADD and ZRANGE WITHSCORES exhibit this same irritating inconsistency: Ick is annoyance-compatible with Redis sorted sets.
However, by analogy with the redis gem’s Redis.current.zrange(), this Ruby wrapper method pairs up the results for you, and converts the string scores to floats.
- you get from this method [[ member_string, score_number] , ... ]
with high scores are swapped out for pset members with lower scores. Otherwise cset members remain in the cset until committed regardless of how low scores in the pset might be.
Redis.current.zrange() withscores: [ member_string, score_number ] representing the lowest-scored elements from the producer set.
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/redis/ick.rb', line 256 def ickreserve(ick_key,max_size=0,backwash: false) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end if !max_size.is_a?(Integer) raise ArgumentError, "bogus non-Integer max_size #{max_size}" end if max_size < 0 raise ArgumentError, "bogus negative #{max_size}" end _statsd_increment('profile.ick.ickreserve.calls') _statsd_timing('profile.ick.ickreserve.max_size',max_size) raw_results = nil _statsd_time('profile.ick.time.ickreserve') do raw_results = _eval( LUA_ICKEXCHANGE, ick_key, max_size, backwash ? 'backwash' : false, ) end _postprocess(raw_results,Skip0ThenFloatifyPairs) end |
#ickstats(ick_key) ⇒ Object
Fetches stats.
about the Ick at ick_key, if any, else nil. If called within a pipeline, returns a redis::Future whose value is a Hash or nil as before.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/redis/ick.rb', line 133 def ickstats(ick_key) if !ick_key.is_a?(String) raise ArgumentError, "bogus non-String ick_key #{ick_key}" end _statsd_increment('profile.ick.ickstats.calls') raw_results = nil _statsd_time('profile.ick.time.ickstats') do raw_results = _eval(LUA_ICKSTATS,ick_key) end _postprocess( raw_results, lambda do |results| return nil if !results # # LUA_ICKSTATS returned bulk data response [k,v,k,v,...] # stats = Hash[*results] # # From http://redis.io/commands/eval, the "Lua to Redis conversion # table" states that: # # Lua number -> Redis integer reply (the number is converted # into an integer) # # ...If you want to return a float from Lua you should return # it as a string. # # LUA_ICKSTATS works around this by converting certain stats to # strings. We reverse that conversion here. # stats.keys.select{|k|/_min$/ =~ k || /_max$/ =~ k}.each do |k| next if !stats[k] stats[k] = (/^\d+$/ =~ stats[k]) ? stats[k].to_i : stats[k].to_f end stats end ) end |
#ickunlink(ick_key) ⇒ Object
Removes all data associated with the Ick in Redis at key.
Similar to UNLINK key, redis.io/commands/unlink, but may unlink multiple keys which together implement the Ick data structure.
UNLINK is O(1) in the number of messages in the Ick, and is available with redis-server >= 4.0.0. Physical space reclamation in Redis, which can be O(N), is deferred to asynchronous server threads.
be >= 1 if an Ick existed at key.
120 121 122 |
# File 'lib/redis/ick.rb', line 120 def ickunlink(ick_key) ickdel(ick_key,unlink: true) end |