Class: Beetle::DeduplicationStore

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/beetle/deduplication_store.rb

Overview

The deduplication store is used internally by Beetle::Client to store information on the status of message processing. This includes:

  • how often a message has already been seen by some consumer

  • whether a message has been processed successfully

  • how many attempts have been made to execute a message handler for a given message

  • how long we should wait before trying to execute the message handler after a failure

  • how many exceptions have been raised during previous execution attempts

  • how long we should wait before trying to perform the next execution attempt

  • whether some other process is already trying to execute the message handler

It also provides a method to garbage collect keys for expired messages.

Constant Summary collapse

KEY_SUFFIXES =

list of key suffixes to use for storing values in Redis. ‘status’ always needs to be the first element of the array.

[:status, :ack_count, :timeout, :delay, :attempts, :exceptions, :mutex, :expires]

Instance Method Summary collapse

Methods included from Logging

#logger

Constructor Details

#initialize(config = Beetle.config) ⇒ DeduplicationStore

Returns a new instance of DeduplicationStore.



16
17
18
19
20
# File 'lib/beetle/deduplication_store.rb', line 16

def initialize(config = Beetle.config)
  @config = config
  @current_master = nil
  @last_time_master_file_changed = nil
end

Instance Method Details

#_eigenclass_Object

:nodoc:



196
197
198
# File 'lib/beetle/deduplication_store.rb', line 196

def _eigenclass_ #:nodoc:
  class << self; self; end
end

#del(msg_id, suffix) ⇒ Object

delete key with given suffix for given msg_id.



106
107
108
# File 'lib/beetle/deduplication_store.rb', line 106

def del(msg_id, suffix)
  with_failover { redis.del(key(msg_id, suffix)) }
end

#del_keys(msg_id) ⇒ Object

delete all keys associated with the given msg_id.



111
112
113
114
115
116
117
118
119
# File 'lib/beetle/deduplication_store.rb', line 111

def del_keys(msg_id)
  expiry = @config.redis_status_key_expiry_interval.to_i
  keys = keys(msg_id)
  status_key = keys.shift if expiry > 0
  with_failover do
    redis.del(*keys)
    redis.expire(status_key, expiry) if expiry > 0
  end
end

#exists(msg_id, suffix) ⇒ Object

check whether key with given suffix exists for a given msg_id.



122
123
124
# File 'lib/beetle/deduplication_store.rb', line 122

def exists(msg_id, suffix)
  with_failover { redis.exists?(key(msg_id, suffix)) }
end

#extract_redis_master(text) ⇒ Object

extract redis master from file content and return the server for our system



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/beetle/deduplication_store.rb', line 175

def extract_redis_master(text)
  system_name = @config.system_name
  redis_master = ""
  text.each_line do |line|
    parts = line.split('/', 2)
    case parts.size
    when 1
      redis_master = parts[0]
    when 2
      name, master = parts
      redis_master = master if name == system_name
    end
  end
  redis_master
end

#flushdbObject

flush the configured redis database. useful for testing.



127
128
129
# File 'lib/beetle/deduplication_store.rb', line 127

def flushdb
  with_failover { redis.flushdb }
end

#get(msg_id, suffix) ⇒ Object

retrieve the value with given suffix for given msg_id. returns a string.



95
96
97
# File 'lib/beetle/deduplication_store.rb', line 95

def get(msg_id, suffix)
  with_failover { redis.get(key(msg_id, suffix)) }
end

#incr(msg_id, suffix) ⇒ Object

increment counter for key with given suffix for given msg_id. returns an integer.



90
91
92
# File 'lib/beetle/deduplication_store.rb', line 90

def incr(msg_id, suffix)
  with_failover { redis.incr(key(msg_id, suffix)) }
end

#key(msg_id, suffix) ⇒ Object

build a Redis key out of a message id and a given suffix



38
39
40
# File 'lib/beetle/deduplication_store.rb', line 38

def key(msg_id, suffix)
  "#{msg_id}:#{suffix}"
end

#keys(msg_id) ⇒ Object

list of keys which potentially exist in Redis for the given message id



43
44
45
# File 'lib/beetle/deduplication_store.rb', line 43

def keys(msg_id)
  KEY_SUFFIXES.map{|suffix| key(msg_id, suffix)}
end

#mget(msg_id, keys) ⇒ Object

retrieve the values with given suffixes for given msg_id. returns a list of strings.



100
101
102
103
# File 'lib/beetle/deduplication_store.rb', line 100

def mget(msg_id, keys)
  keys = keys.map{|suffix| key(msg_id, suffix)}
  with_failover { redis.mget(*keys) }
end

#mset(msg_id, values) ⇒ Object

store some key/value pairs



78
79
80
81
# File 'lib/beetle/deduplication_store.rb', line 78

def mset(msg_id, values)
  values = values.inject([]){|a,(k,v)| a.concat([key(msg_id, k), v])}
  with_failover { redis.mset(*values) }
end

#msetnx(msg_id, values) ⇒ Object

store some key/value pairs if none of the given keys exist.



84
85
86
87
# File 'lib/beetle/deduplication_store.rb', line 84

def msetnx(msg_id, values)
  values = values.inject([]){|a,(k,v)| a.concat([key(msg_id, k), v])}
  with_failover { redis.msetnx(*values) }
end

#msg_id(key) ⇒ Object

extract message id from a given Redis key



48
49
50
# File 'lib/beetle/deduplication_store.rb', line 48

def msg_id(key)
  key =~ /^(msgid:[^:]*:[-0-9a-f]*):.*$/ && $1
end

#read_master_fileObject

server:port string from the redis master file



192
193
194
# File 'lib/beetle/deduplication_store.rb', line 192

def read_master_file
  File.read(@config.redis_server).chomp
end

#redisObject

get the Redis instance



23
24
25
26
27
28
29
30
31
# File 'lib/beetle/deduplication_store.rb', line 23

def redis
  redis_master_source = @config.redis_server =~ /^\S+\:\d+$/ ? "server_string" : "master_file"
  _eigenclass_.class_eval <<-EVALS, __FILE__, __LINE__
    def redis
      redis_master_from_#{redis_master_source}
    end
  EVALS
  redis
end

#redis_master_file_changed?Boolean

redis master file changed outside the running process?

Returns:

  • (Boolean)


163
164
165
# File 'lib/beetle/deduplication_store.rb', line 163

def redis_master_file_changed?
  @last_time_master_file_changed != File.mtime(@config.redis_server)
end

#redis_master_from_master_fileObject

set current redis master from master file



157
158
159
160
# File 'lib/beetle/deduplication_store.rb', line 157

def redis_master_from_master_file
  set_current_redis_master_from_master_file if redis_master_file_changed?
  @current_master
end

#redis_master_from_server_stringObject

set current redis master instance (as specified in the Beetle::Configuration)



152
153
154
# File 'lib/beetle/deduplication_store.rb', line 152

def redis_master_from_server_string
  @current_master ||= Redis.from_server_string(@config.redis_server, @config.redis_options)
end

#set(msg_id, suffix, value) ⇒ Object

unconditionally store a (key,value) pair with given suffix for given msg_id.



53
54
55
# File 'lib/beetle/deduplication_store.rb', line 53

def set(msg_id, suffix, value)
  with_failover { redis.set(key(msg_id, suffix), value) }
end

#set_current_redis_master_from_master_fileObject

set current redis master from server:port string contained in the redis master for our system



168
169
170
171
172
# File 'lib/beetle/deduplication_store.rb', line 168

def set_current_redis_master_from_master_file
  @last_time_master_file_changed = File.mtime(@config.redis_server)
  server_string = extract_redis_master(read_master_file)
  @current_master = !server_string.blank? ? Redis.from_server_string(server_string, @config.redis_options ) : nil
end

#setnx(msg_id, suffix, value) ⇒ Object

store a (key,value) pair with given suffix for given msg_id if it doesn’t exists yet.



58
59
60
# File 'lib/beetle/deduplication_store.rb', line 58

def setnx(msg_id, suffix, value)
  with_failover { redis.setnx(key(msg_id, suffix), value) }
end

#setnx_completed!(msg_id) ⇒ Object

store completion status for given msg_id if it doesn’t exist yet. Returns whether the operation was successful.



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/beetle/deduplication_store.rb', line 64

def setnx_completed!(msg_id)
  expiry = @config.redis_status_key_expiry_interval.to_i
  return true if expiry == 0
  with_failover do
    redis.set(
      key(msg_id, :status),
      "completed",
      :nx => true,
      :ex => expiry,
    )
  end
end

#with_failoverObject

performs redis operations by yielding a passed in block, waiting for a new master to show up on the network if the operation throws an exception. if a new master doesn’t appear after the configured timeout interval, we raise an exception.



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/beetle/deduplication_store.rb', line 134

def with_failover #:nodoc:
  end_time = Time.now.to_i + @config.redis_failover_timeout.to_i
  begin
    yield
  rescue Exception => e
    Beetle::reraise_expectation_errors!
    logger.error "Beetle: redis connection error #{e} #{@config.redis_server} (#{e.backtrace[0]})"
    if Time.now.to_i < end_time
      sleep 1
      logger.info "Beetle: retrying redis operation"
      retry
    else
      raise NoRedisMaster.new(e.to_s)
    end
  end
end