Class: Mantle::CatchUp
- Inherits:
-
Object
- Object
- Mantle::CatchUp
- Defined in:
- lib/mantle/catch_up.rb
Constant Summary collapse
- KEY =
"mantle:catch_up"
- HOURS_TO_KEEP =
1
- CLEANUP_EVERY_MINUTES =
5
Instance Attribute Summary collapse
-
#key ⇒ Object
readonly
Returns the value of attribute key.
-
#message_bus_channels ⇒ Object
Returns the value of attribute message_bus_channels.
-
#redis ⇒ Object
Returns the value of attribute redis.
Instance Method Summary collapse
- #add_message(channel, message, now = Time.now.utc.to_f) ⇒ Object
- #catch_up ⇒ Object
- #clear_expired ⇒ Object
- #deserialize_payload(payload) ⇒ Object
- #enqueue_clear_if_ready ⇒ Object
- #hours_ago_in_seconds(hours) ⇒ Object
-
#initialize ⇒ CatchUp
constructor
A new instance of CatchUp.
- #last_success_time ⇒ Object
- #route_messages(payloads_with_time) ⇒ Object
Constructor Details
Instance Attribute Details
#key ⇒ Object (readonly)
Returns the value of attribute key.
8 9 10 |
# File 'lib/mantle/catch_up.rb', line 8 def key @key end |
#message_bus_channels ⇒ Object
Returns the value of attribute message_bus_channels.
7 8 9 |
# File 'lib/mantle/catch_up.rb', line 7 def @message_bus_channels end |
#redis ⇒ Object
Returns the value of attribute redis.
7 8 9 |
# File 'lib/mantle/catch_up.rb', line 7 def redis @redis end |
Instance Method Details
#add_message(channel, message, now = Time.now.utc.to_f) ⇒ Object
16 17 18 19 20 21 |
# File 'lib/mantle/catch_up.rb', line 16 def (channel, , now = Time.now.utc.to_f) json = serialize_payload(channel, ) redis.zadd(key, now, json) Mantle.logger.debug("Added message to catch up list for channel: #{channel}") now end |
#catch_up ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/mantle/catch_up.rb', line 38 def catch_up raise Mantle::Error::MissingRedisConnection unless redis if last_success_time.nil? Mantle.logger.info("Skipping catch up because of missing last processed time...") return end Mantle.logger.info("Catching up from time: #{last_success_time}") payloads_with_time = redis.zrangebyscore(key, last_success_time, 'inf', with_scores: true) (payloads_with_time) if payloads_with_time.any? end |
#clear_expired ⇒ Object
33 34 35 36 |
# File 'lib/mantle/catch_up.rb', line 33 def clear_expired max_time_to_clear = hours_ago_in_seconds(HOURS_TO_KEEP) redis.zremrangebyscore(key, 0, max_time_to_clear) end |
#deserialize_payload(payload) ⇒ Object
67 68 69 |
# File 'lib/mantle/catch_up.rb', line 67 def deserialize_payload(payload) JSON(payload).values_at 'channel', 'message' end |
#enqueue_clear_if_ready ⇒ Object
23 24 25 26 27 28 29 30 31 |
# File 'lib/mantle/catch_up.rb', line 23 def enqueue_clear_if_ready now = Time.now.utc.to_f five_minutes_ago = now - (CLEANUP_EVERY_MINUTES * 60.0) last_cleanup = Mantle::LocalRedis.last_catch_up_cleanup_at if last_cleanup.nil? || last_cleanup < five_minutes_ago Mantle::Workers::CatchUpCleanupWorker.perform_async end end |
#hours_ago_in_seconds(hours) ⇒ Object
71 72 73 74 |
# File 'lib/mantle/catch_up.rb', line 71 def hours_ago_in_seconds(hours) hour_seconds = 60 * 60 * hours Time.now.utc.to_f - hour_seconds end |
#last_success_time ⇒ Object
52 53 54 |
# File 'lib/mantle/catch_up.rb', line 52 def last_success_time LocalRedis. end |
#route_messages(payloads_with_time) ⇒ Object
56 57 58 59 60 61 62 63 64 65 |
# File 'lib/mantle/catch_up.rb', line 56 def (payloads_with_time) payloads_with_time.each do |payload_with_time| payload, time = payload_with_time channel, = deserialize_payload(payload) if .include?(channel) Mantle::MessageRouter.new(channel, ).route end end end |