Class: Streamforce::Extension::Replay

Inherits:
Object
  • Object
show all
Defined in:
lib/streamforce/extension/replay.rb

Instance Method Summary collapse

Constructor Details

#initialize(log_level = Logger::INFO) ⇒ Replay

Returns a new instance of Replay.



4
5
6
7
8
# File 'lib/streamforce/extension/replay.rb', line 4

def initialize(log_level = Logger::INFO)
  @logger = Logger.new($stdout)
  @logger.level = log_level
  @redis = Redis.new
end

Instance Method Details

#incoming(message, callback) ⇒ Object



10
11
12
13
14
15
16
# File 'lib/streamforce/extension/replay.rb', line 10

def incoming(message, callback)
  replay_id = message.dig "data", "event", "replayId"
  channel   = message["channel"]

  store(channel, replay_id)
  callback.call(message)
end

#outgoing(message, callback) ⇒ Object



18
19
20
21
22
23
24
25
# File 'lib/streamforce/extension/replay.rb', line 18

def outgoing(message, callback)
  return callback.call(message) unless message["channel"] == "/meta/subscribe"

  channel = message["subscription"]
  message["ext"] = { "replay" => { channel => replay_id(channel).to_i } }

  callback.call(message)
end

#replay_id(channel) ⇒ Object



33
34
35
# File 'lib/streamforce/extension/replay.rb', line 33

def replay_id(channel)
  @redis.get(channel) || -1
end

#store(channel, replay_id) ⇒ Object



27
28
29
30
31
# File 'lib/streamforce/extension/replay.rb', line 27

def store(channel, replay_id)
  return if channel.nil? || replay_id.nil?

  @redis.set channel, replay_id, ex: 86400
end