Class: KoinzRedis
- Inherits:
-
Redis
- Object
- Redis
- KoinzRedis
- Defined in:
- lib/koinz/redis.rb
Instance Method Summary collapse
-
#backlog(channels, &block) ⇒ Object
returns the pending messages [ event, payload ] pairs Events are ordered sets based on timstamp!.
-
#initialize(options = {}) ⇒ KoinzRedis
constructor
A new instance of KoinzRedis.
-
#prune(channel) ⇒ Object
For the sake of simplicity, speed and efficiency, we prune only events which we have just published!.
- #publish(channel, message) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ KoinzRedis
Returns a new instance of KoinzRedis.
6 7 8 9 |
# File 'lib/koinz/redis.rb', line 6 def initialize( = {}) @timestamp = [:timestamp].to_i || 0 # 0 mean -- no backlog needed super end |
Instance Method Details
#backlog(channels, &block) ⇒ Object
returns the pending messages [ event, payload ] pairs Events are ordered sets based on timstamp!
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/koinz/redis.rb', line 21 def backlog(channels, &block) return if @timestamp == 0 Rails.logger.info('Processing Pending messages') events = channels.collect {|e| keys(e)}.flatten return if not events or events.empty? # no events to process destination = "pending-#{Time.now.to_i}" zunionstore(destination, events) # We want events only after the timestamp! = zrangebyscore(destination, "(#{@timestamp.to_s}", "+inf") # Pending messages are from sorted sets (ordered by timestamp) # No need to send the timestamp for now -- but we store it just in case! .each do || event, payload = MultiJson.decode() block.call(event, payload) end # cleanup del(destination) Rails.logger.info('Completed processing of pending messages') end |
#prune(channel) ⇒ Object
For the sake of simplicity, speed and efficiency, we prune only events which we have just published!
49 50 51 52 53 54 55 56 |
# File 'lib/koinz/redis.rb', line 49 def prune(channel) # All the application timestamps. We have to pass varargs hence we send # *keys to mget! threshold = mget(*keys('*_timestamp_*')).sort.first # The earliest timestamp # threshold is the timestamp before which all events should be destroyed zremrangebyscore(channel, '-inf', threshold) end |
#publish(channel, message) ⇒ Object
11 12 13 14 15 16 17 |
# File 'lib/koinz/redis.rb', line 11 def publish(channel, ) = Time.now.to_i zadd(channel, , MultiJson.encode([channel, ])) super(channel, MultiJson.encode()) prune(channel) end |