Class: SimpleFeed::Providers::Redis::Provider
- Inherits:
-
Base::Provider
- Object
- Base::Provider
- SimpleFeed::Providers::Redis::Provider
- Includes:
- Driver
- Defined in:
- lib/simplefeed/providers/redis/provider.rb
Overview
Internal data structure:
```YAML
u.afkj234.data:
- [ 'John liked Robert', '2016-11-20 23:32:56 -0800' ]
- [ 'Debbie liked Robert', '2016-11-20 23:35:56 -0800' ]
u.afkj234.meta: { total: 2, unread: 2, last_read: 2016-11-20 22:00:34 -08:00 GMT }
```
Constant Summary collapse
- FEED_METHODS =
%i(total_memory_bytes total_users last_disk_save_time).freeze
Instance Attribute Summary
Attributes included from Driver
Instance Method Summary collapse
- #delete(user_ids:, value:) ⇒ Object
- #delete_if(user_ids:) ⇒ Object
- #fetch(user_ids:, since: nil, reset_last_read: false) ⇒ Object
- #last_read(user_ids:) ⇒ Object
- #paginate(user_ids:, page:, per_page: feed.per_page, with_total: false, reset_last_read: false) ⇒ Object
- #reset_last_read(user_ids:, at: Time.now) ⇒ Object
- #store(user_ids:, value:, at: Time.now) ⇒ Object
- #total_count(user_ids:) ⇒ Object
- #total_memory_bytes ⇒ Object
- #total_users ⇒ Object
- #transform_response(user_id = nil, result) ⇒ Object
- #transformable_type?(value) ⇒ Boolean
- #unread_count(user_ids:) ⇒ Object
- #wipe(user_ids:) ⇒ Object
- #with_stats(operation) ⇒ Object
Methods included from Driver
#debug?, #exec, #initialize, #on_error, #with_multi, #with_pipelined, #with_redis, #with_retries
Instance Method Details
#delete(user_ids:, value:) ⇒ Object
44 45 46 47 48 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 44 def delete(user_ids:, value:, **) with_response_pipelined(user_ids) do |redis, key| redis.zrem(key.data, value) end end |
#delete_if(user_ids:) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 50 def delete_if(user_ids:) raise ArgumentError, '#delete_if must be called with a block that receives (user_id, event) as arguments.' unless block_given? with_response_batched(user_ids) do |key| fetch(user_ids: [key.consumer])[key.consumer].map do |event| with_redis do |redis| if yield(event, key.consumer) redis.zrem(key.data, event.value) ? event : nil end end end.compact end end |
#fetch(user_ids:, since: nil, reset_last_read: false) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 84 def fetch(user_ids:, since: nil, reset_last_read: false) if since == :unread last_read_response = with_response_pipelined(user_ids) do |redis, key| get_users_last_read(redis, key) end end response = with_response_pipelined(user_ids) do |redis, key| if since == :unread redis.zrevrangebyscore(key.data, '+inf', (last_read_response.delete(key.consumer) || 0).to_f, withscores: true) elsif since redis.zrevrangebyscore(key.data, '+inf', since.to_f, withscores: true) else redis.zrevrange(key.data, 0, -1, withscores: true) end end reset_last_read_value(user_ids: user_ids, at: reset_last_read) if reset_last_read response end |
#last_read(user_ids:) ⇒ Object
128 129 130 131 132 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 128 def last_read(user_ids:) with_response_pipelined(user_ids) do |redis, key, *| get_users_last_read(redis, key) end end |
#paginate(user_ids:, page:, per_page: feed.per_page, with_total: false, reset_last_read: false) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 70 def paginate(user_ids:, page:, per_page: feed.per_page, with_total: false, reset_last_read: false) reset_last_read_value(user_ids: user_ids, at: reset_last_read) if reset_last_read with_response_pipelined(user_ids) do |redis, key| events = paginated_events(page, per_page, redis, key) with_total ? { events: events, total_count: redis.zcard(key.data) } : events end end |
#reset_last_read(user_ids:, at: Time.now) ⇒ Object
106 107 108 109 110 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 106 def reset_last_read(user_ids:, at: Time.now) with_response_pipelined(user_ids) do |redis, key, *| reset_users_last_read(redis, key, at.to_f) end end |
#store(user_ids:, value:, at: Time.now) ⇒ Object
36 37 38 39 40 41 42 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 36 def store(user_ids:, value:, at: Time.now) with_response_pipelined(user_ids) do |redis, key| tap redis.zadd(key.data, at.to_f, value) do redis.zremrangebyrank(key.data, 0, -feed.max_size - 1) end end end |
#total_count(user_ids:) ⇒ Object
112 113 114 115 116 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 112 def total_count(user_ids:) with_response_pipelined(user_ids) do |redis, key| redis.zcard(key.data) end end |
#total_memory_bytes ⇒ Object
136 137 138 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 136 def total_memory_bytes with_stats(:used_memory_since_boot) end |
#total_users ⇒ Object
140 141 142 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 140 def total_users with_redis { |redis| redis.dbsize / 2 } end |
#transform_response(user_id = nil, result) ⇒ Object
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 150 def transform_response(user_id = nil, result) case result when ::Redis::Future transform_response(user_id, result.value) when ::Hash if result.values.any? { |v| transformable_type?(v) } result.each { |k, v| result[k] = transform_response(user_id, v) } else result end when ::Array if result.any? { |v| transformable_type?(v) } result = result.map { |v| transform_response(user_id, v) } end if result.size == 2 && result[1].is_a?(Float) SimpleFeed::Event.new(value: result[0], at: Time.at(result[1])) else result end when ::String if result =~ /^\d+\.\d+$/ result.to_f elsif result =~ /^\d+$/ result.to_i else result end else result end end |
#transformable_type?(value) ⇒ Boolean
188 189 190 191 192 193 194 195 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 188 def transformable_type?(value) [ ::Redis::Future, ::Hash, ::Array, ::String ].include?(value.class) end |
#unread_count(user_ids:) ⇒ Object
118 119 120 121 122 123 124 125 126 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 118 def unread_count(user_ids:) response = with_response_pipelined(user_ids) do |redis, key| get_users_last_read(redis, key) end with_response_pipelined(response.user_ids, response) do |redis, key, _response| last_read = _response.delete(key.consumer).to_f redis.zcount(key.data, last_read, '+inf') end end |
#wipe(user_ids:) ⇒ Object
64 65 66 67 68 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 64 def wipe(user_ids:) with_response_pipelined(user_ids) do |redis, key| key.keys.all? { |redis_key| redis.del(redis_key) } end end |
#with_stats(operation) ⇒ Object
144 145 146 147 148 |
# File 'lib/simplefeed/providers/redis/provider.rb', line 144 def with_stats(operation) with_redis do |redis| SimpleFeed::Providers::Redis::Stats.new(redis).send(operation) end end |