Class: KoinzRedis

Inherits:
Redis
  • Object
show all
Defined in:
lib/koinz/redis.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ KoinzRedis

Returns a new instance of KoinzRedis.



6
7
8
9
# File 'lib/koinz/redis.rb', line 6

def initialize(options = {})
  @timestamp = options[:timestamp].to_i || 0 # 0 mean -- no backlog needed
  super
end

Instance Method Details

#backlog(channels, &block) ⇒ Object

returns the pending messages [ event, payload ] pairs Events are ordered sets based on timstamp!



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/koinz/redis.rb', line 21

def backlog(channels, &block)
  return if @timestamp == 0

  Rails.logger.info('Processing Pending messages')
  events = channels.collect {|e| keys(e)}.flatten

  return if not events or events.empty? # no events to process

  destination = "pending-#{Time.now.to_i}"
  zunionstore(destination, events)
  # We want events only after the timestamp!
  messages = zrangebyscore(destination, "(#{@timestamp.to_s}", "+inf")

  # Pending messages are from sorted sets (ordered by timestamp)
  # No need to send the timestamp for now -- but we store it just in case!
  messages.each do |message|
    event, payload = MultiJson.decode(message)
    block.call(event, payload)
  end 

  # cleanup
  del(destination)

  Rails.logger.info('Completed processing of pending messages')
end

#prune(channel) ⇒ Object

For the sake of simplicity, speed and efficiency, we prune only events which we have just published!



49
50
51
52
53
54
55
56
# File 'lib/koinz/redis.rb', line 49

def prune(channel)
  # All the application timestamps. We have to pass varargs hence we send 
  # *keys to mget! 
  threshold = mget(*keys('*_timestamp_*')).sort.first # The earliest timestamp

  # threshold is the timestamp before which all events should be destroyed
  zremrangebyscore(channel, '-inf', threshold)
end

#publish(channel, message) ⇒ Object



11
12
13
14
15
16
17
# File 'lib/koinz/redis.rb', line 11

def publish(channel, message)
  timestamp = Time.now.to_i
  zadd(channel, timestamp, MultiJson.encode([channel, message]))
  super(channel, MultiJson.encode(message))

  prune(channel)
end