Class: Beetle::DeduplicationStore
- Inherits:
-
Object
- Object
- Beetle::DeduplicationStore
- Includes:
- Logging
- Defined in:
- lib/beetle/deduplication_store.rb
Overview
The deduplication store is used internally by Beetle::Client to store information on the status of message processing. This includes:
-
how often a message has already been seen by some consumer
-
whether a message has been processed successfully
-
how many attempts have been made to execute a message handler for a given message
-
how long we should wait before trying to execute the message handler after a failure
-
how many exceptions have been raised during previous execution attempts
-
how long we should wait before trying to perform the next execution attempt
-
whether some other process is already trying to execute the message handler
It also provides a method to garbage collect keys for expired messages.
Constant Summary collapse
- KEY_SUFFIXES =
list of key suffixes to use for storing values in Redis. ‘status’ always needs to be the first element of the array.
[:status, :ack_count, :timeout, :delay, :attempts, :exceptions, :mutex, :expires]
Instance Method Summary collapse
-
#_eigenclass_ ⇒ Object
:nodoc:.
-
#del(msg_id, suffix) ⇒ Object
delete key with given
suffix
for givenmsg_id
. -
#del_keys(msg_id) ⇒ Object
delete all keys associated with the given
msg_id
. -
#exists(msg_id, suffix) ⇒ Object
check whether key with given suffix exists for a given
msg_id
. -
#extract_redis_master(text) ⇒ Object
extract redis master from file content and return the server for our system.
-
#flushdb ⇒ Object
flush the configured redis database.
-
#get(msg_id, suffix) ⇒ Object
retrieve the value with given
suffix
for givenmsg_id
. -
#incr(msg_id, suffix) ⇒ Object
increment counter for key with given
suffix
for givenmsg_id
. -
#initialize(config = Beetle.config) ⇒ DeduplicationStore
constructor
A new instance of DeduplicationStore.
-
#key(msg_id, suffix) ⇒ Object
build a Redis key out of a message id and a given suffix.
-
#keys(msg_id) ⇒ Object
list of keys which potentially exist in Redis for the given message id.
-
#mget(msg_id, keys) ⇒ Object
retrieve the values with given
suffixes
for givenmsg_id
. -
#mset(msg_id, values) ⇒ Object
store some key/value pairs.
-
#msetnx(msg_id, values) ⇒ Object
store some key/value pairs if none of the given keys exist.
-
#msg_id(key) ⇒ Object
extract message id from a given Redis key.
-
#read_master_file ⇒ Object
server:port string from the redis master file.
-
#redis ⇒ Object
get the Redis instance.
-
#redis_master_file_changed? ⇒ Boolean
redis master file changed outside the running process?.
-
#redis_master_from_master_file ⇒ Object
set current redis master from master file.
-
#redis_master_from_server_string ⇒ Object
set current redis master instance (as specified in the Beetle::Configuration).
-
#set(msg_id, suffix, value) ⇒ Object
unconditionally store a (key,value) pair with given
suffix
for givenmsg_id
. -
#set_current_redis_master_from_master_file ⇒ Object
set current redis master from server:port string contained in the redis master for our system.
-
#setnx(msg_id, suffix, value) ⇒ Object
store a (key,value) pair with given
suffix
for givenmsg_id
if it doesn’t exists yet. -
#setnx_completed!(msg_id) ⇒ Object
store completion status for given
msg_id
if it doesn’t exist yet. -
#with_failover ⇒ Object
performs redis operations by yielding a passed in block, waiting for a new master to show up on the network if the operation throws an exception.
Methods included from Logging
Constructor Details
#initialize(config = Beetle.config) ⇒ DeduplicationStore
Returns a new instance of DeduplicationStore.
16 17 18 19 20 |
# File 'lib/beetle/deduplication_store.rb', line 16 def initialize(config = Beetle.config) @config = config @current_master = nil @last_time_master_file_changed = nil end |
Instance Method Details
#_eigenclass_ ⇒ Object
:nodoc:
196 197 198 |
# File 'lib/beetle/deduplication_store.rb', line 196 def _eigenclass_ #:nodoc: class << self; self; end end |
#del(msg_id, suffix) ⇒ Object
delete key with given suffix
for given msg_id
.
106 107 108 |
# File 'lib/beetle/deduplication_store.rb', line 106 def del(msg_id, suffix) with_failover { redis.del(key(msg_id, suffix)) } end |
#del_keys(msg_id) ⇒ Object
delete all keys associated with the given msg_id
.
111 112 113 114 115 116 117 118 119 |
# File 'lib/beetle/deduplication_store.rb', line 111 def del_keys(msg_id) expiry = @config.redis_status_key_expiry_interval.to_i keys = keys(msg_id) status_key = keys.shift if expiry > 0 with_failover do redis.del(*keys) redis.expire(status_key, expiry) if expiry > 0 end end |
#exists(msg_id, suffix) ⇒ Object
check whether key with given suffix exists for a given msg_id
.
122 123 124 |
# File 'lib/beetle/deduplication_store.rb', line 122 def exists(msg_id, suffix) with_failover { redis.exists?(key(msg_id, suffix)) } end |
#extract_redis_master(text) ⇒ Object
extract redis master from file content and return the server for our system
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/beetle/deduplication_store.rb', line 175 def extract_redis_master(text) system_name = @config.system_name redis_master = "" text.each_line do |line| parts = line.split('/', 2) case parts.size when 1 redis_master = parts[0] when 2 name, master = parts redis_master = master if name == system_name end end redis_master end |
#flushdb ⇒ Object
flush the configured redis database. useful for testing.
127 128 129 |
# File 'lib/beetle/deduplication_store.rb', line 127 def flushdb with_failover { redis.flushdb } end |
#get(msg_id, suffix) ⇒ Object
retrieve the value with given suffix
for given msg_id
. returns a string.
95 96 97 |
# File 'lib/beetle/deduplication_store.rb', line 95 def get(msg_id, suffix) with_failover { redis.get(key(msg_id, suffix)) } end |
#incr(msg_id, suffix) ⇒ Object
increment counter for key with given suffix
for given msg_id
. returns an integer.
90 91 92 |
# File 'lib/beetle/deduplication_store.rb', line 90 def incr(msg_id, suffix) with_failover { redis.incr(key(msg_id, suffix)) } end |
#key(msg_id, suffix) ⇒ Object
build a Redis key out of a message id and a given suffix
38 39 40 |
# File 'lib/beetle/deduplication_store.rb', line 38 def key(msg_id, suffix) "#{msg_id}:#{suffix}" end |
#keys(msg_id) ⇒ Object
list of keys which potentially exist in Redis for the given message id
43 44 45 |
# File 'lib/beetle/deduplication_store.rb', line 43 def keys(msg_id) KEY_SUFFIXES.map{|suffix| key(msg_id, suffix)} end |
#mget(msg_id, keys) ⇒ Object
retrieve the values with given suffixes
for given msg_id
. returns a list of strings.
100 101 102 103 |
# File 'lib/beetle/deduplication_store.rb', line 100 def mget(msg_id, keys) keys = keys.map{|suffix| key(msg_id, suffix)} with_failover { redis.mget(*keys) } end |
#mset(msg_id, values) ⇒ Object
store some key/value pairs
78 79 80 81 |
# File 'lib/beetle/deduplication_store.rb', line 78 def mset(msg_id, values) values = values.inject([]){|a,(k,v)| a.concat([key(msg_id, k), v])} with_failover { redis.mset(*values) } end |
#msetnx(msg_id, values) ⇒ Object
store some key/value pairs if none of the given keys exist.
84 85 86 87 |
# File 'lib/beetle/deduplication_store.rb', line 84 def msetnx(msg_id, values) values = values.inject([]){|a,(k,v)| a.concat([key(msg_id, k), v])} with_failover { redis.msetnx(*values) } end |
#msg_id(key) ⇒ Object
extract message id from a given Redis key
48 49 50 |
# File 'lib/beetle/deduplication_store.rb', line 48 def msg_id(key) key =~ /^(msgid:[^:]*:[-0-9a-f]*):.*$/ && $1 end |
#read_master_file ⇒ Object
server:port string from the redis master file
192 193 194 |
# File 'lib/beetle/deduplication_store.rb', line 192 def read_master_file File.read(@config.redis_server).chomp end |
#redis ⇒ Object
get the Redis instance
23 24 25 26 27 28 29 30 31 |
# File 'lib/beetle/deduplication_store.rb', line 23 def redis redis_master_source = @config.redis_server =~ /^\S+\:\d+$/ ? "server_string" : "master_file" _eigenclass_.class_eval <<-EVALS, __FILE__, __LINE__ def redis redis_master_from_#{redis_master_source} end EVALS redis end |
#redis_master_file_changed? ⇒ Boolean
redis master file changed outside the running process?
163 164 165 |
# File 'lib/beetle/deduplication_store.rb', line 163 def redis_master_file_changed? @last_time_master_file_changed != File.mtime(@config.redis_server) end |
#redis_master_from_master_file ⇒ Object
set current redis master from master file
157 158 159 160 |
# File 'lib/beetle/deduplication_store.rb', line 157 def redis_master_from_master_file set_current_redis_master_from_master_file if redis_master_file_changed? @current_master end |
#redis_master_from_server_string ⇒ Object
set current redis master instance (as specified in the Beetle::Configuration)
152 153 154 |
# File 'lib/beetle/deduplication_store.rb', line 152 def redis_master_from_server_string @current_master ||= Redis.from_server_string(@config.redis_server, @config.) end |
#set(msg_id, suffix, value) ⇒ Object
unconditionally store a (key,value) pair with given suffix
for given msg_id
.
53 54 55 |
# File 'lib/beetle/deduplication_store.rb', line 53 def set(msg_id, suffix, value) with_failover { redis.set(key(msg_id, suffix), value) } end |
#set_current_redis_master_from_master_file ⇒ Object
set current redis master from server:port string contained in the redis master for our system
168 169 170 171 172 |
# File 'lib/beetle/deduplication_store.rb', line 168 def set_current_redis_master_from_master_file @last_time_master_file_changed = File.mtime(@config.redis_server) server_string = extract_redis_master(read_master_file) @current_master = !server_string.blank? ? Redis.from_server_string(server_string, @config. ) : nil end |
#setnx(msg_id, suffix, value) ⇒ Object
store a (key,value) pair with given suffix
for given msg_id
if it doesn’t exists yet.
58 59 60 |
# File 'lib/beetle/deduplication_store.rb', line 58 def setnx(msg_id, suffix, value) with_failover { redis.setnx(key(msg_id, suffix), value) } end |
#setnx_completed!(msg_id) ⇒ Object
store completion status for given msg_id
if it doesn’t exist yet. Returns whether the operation was successful.
64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/beetle/deduplication_store.rb', line 64 def setnx_completed!(msg_id) expiry = @config.redis_status_key_expiry_interval.to_i return true if expiry == 0 with_failover do redis.set( key(msg_id, :status), "completed", :nx => true, :ex => expiry, ) end end |
#with_failover ⇒ Object
performs redis operations by yielding a passed in block, waiting for a new master to show up on the network if the operation throws an exception. if a new master doesn’t appear after the configured timeout interval, we raise an exception.
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/beetle/deduplication_store.rb', line 134 def with_failover #:nodoc: end_time = Time.now.to_i + @config.redis_failover_timeout.to_i begin yield rescue Exception => e Beetle::reraise_expectation_errors! logger.error "Beetle: redis connection error #{e} #{@config.redis_server} (#{e.backtrace[0]})" if Time.now.to_i < end_time sleep 1 logger.info "Beetle: retrying redis operation" retry else raise NoRedisMaster.new(e.to_s) end end end |