Class: Pecorino::Adapters::SqliteAdapter
- Inherits:
-
Object
- Object
- Pecorino::Adapters::SqliteAdapter
- Defined in:
- lib/pecorino/adapters/sqlite_adapter.rb
Instance Method Summary collapse
- #add_tokens(key:, capacity:, leak_rate:, n_tokens:) ⇒ Object
- #add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) ⇒ Object
- #blocked_until(key:) ⇒ Object
- #create_tables(active_record_schema) ⇒ Object
-
#initialize(model_class) ⇒ SqliteAdapter
constructor
A new instance of SqliteAdapter.
- #prune ⇒ Object
- #set_block(key:, block_for:) ⇒ Object
- #state(key:, capacity:, leak_rate:) ⇒ Object
Constructor Details
#initialize(model_class) ⇒ SqliteAdapter
Returns a new instance of SqliteAdapter.
4 5 6 |
# File 'lib/pecorino/adapters/sqlite_adapter.rb', line 4 def initialize(model_class) @model_class = model_class end |
Instance Method Details
#add_tokens(key:, capacity:, leak_rate:, n_tokens:) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/pecorino/adapters/sqlite_adapter.rb', line 44 def add_tokens(key:, capacity:, leak_rate:, n_tokens:) # Take double the time it takes the bucket to empty under normal circumstances # until the bucket may be deleted. may_be_deleted_after_seconds = (capacity.to_f / leak_rate.to_f) * 2.0 # Create the leaky bucket if it does not exist, and update # to the new level, taking the leak rate into account - if the bucket exists. query_params = { key: key.to_s, capacity: capacity.to_f, delete_after_s: may_be_deleted_after_seconds, leak_rate: leak_rate.to_f, now_s: Time.now.to_f, # See above as to why we are using a time value passed in fillup: n_tokens.to_f } sql = @model_class.sanitize_sql_array([<<~SQL, query_params]) INSERT INTO pecorino_leaky_buckets AS t (key, last_touched_at, may_be_deleted_after, level) VALUES ( :key, :now_s, -- Precision loss must be avoided here as it is used for calculations DATETIME('now', '+:delete_after_s seconds'), -- Precision loss is acceptable here MAX(0.0, MIN( :capacity, :fillup ) ) ) ON CONFLICT (key) DO UPDATE SET last_touched_at = EXCLUDED.last_touched_at, may_be_deleted_after = EXCLUDED.may_be_deleted_after, level = MAX(0.0, MIN( :capacity, t.level + :fillup - ((:now_s - t.last_touched_at) * :leak_rate) ) ) RETURNING level, -- Compare level to the capacity inside the DB so that we won't have rounding issues level >= :capacity AS did_overflow SQL # Note the use of .uncached here. The AR query cache will actually see our # query as a repeat (since we use "select_one" for the RETURNING bit) and will not call into Postgres # correctly, thus the clock_timestamp() value would be frozen between calls. We don't want that here. # See https://stackoverflow.com/questions/73184531/why-would-postgres-clock-timestamp-freeze-inside-a-rails-unit-test upserted = @model_class.connection.uncached { @model_class.connection.select_one(sql) } capped_level_after_fillup, one_if_did_overflow = upserted.fetch("level"), upserted.fetch("did_overflow") [capped_level_after_fillup, one_if_did_overflow == 1] end |
#add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/pecorino/adapters/sqlite_adapter.rb', line 99 def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) # Take double the time it takes the bucket to empty under normal circumstances # until the bucket may be deleted. may_be_deleted_after_seconds = (capacity.to_f / leak_rate.to_f) * 2.0 # Create the leaky bucket if it does not exist, and update # to the new level, taking the leak rate into account - if the bucket exists. query_params = { key: key.to_s, capacity: capacity.to_f, delete_after_s: may_be_deleted_after_seconds, leak_rate: leak_rate.to_f, now_s: Time.now.to_f, # See above as to why we are using a time value passed in fillup: n_tokens.to_f } # Sadly with SQLite we need to do an INSERT first, because otherwise the inserted row is visible # to the WITH clause, so we cannot combine the initial fillup and the update into one statement. # This shuld be fine however since we will suppress the INSERT on a key conflict insert_sql = @model_class.sanitize_sql_array([<<~SQL, query_params]) INSERT INTO pecorino_leaky_buckets AS t (key, last_touched_at, may_be_deleted_after, level) VALUES ( :key, :now_s, -- Precision loss must be avoided here as it is used for calculations DATETIME('now', '+:delete_after_s seconds'), -- Precision loss is acceptable here 0.0 ) ON CONFLICT (key) DO UPDATE SET -- Make sure we extend the lifetime of the row -- so that it can't be deleted between our INSERT and our UPDATE may_be_deleted_after = EXCLUDED.may_be_deleted_after SQL @model_class.connection.execute(insert_sql) sql = @model_class.sanitize_sql_array([<<~SQL, query_params]) -- With SQLite MATERIALIZED has to be used so that level_post is calculated before the UPDATE takes effect WITH pre(level_post_with_uncapped_fillup, level_post) AS MATERIALIZED ( SELECT -- Note the double clamping here. First we clamp the "current level - leak" to not go below zero, -- then we also clamp the above + fillup to not go below 0 MAX(0.0, MAX(0.0, level - ((:now_s - last_touched_at) * :leak_rate)) + :fillup) AS level_post_with_uncapped_fillup, MAX(0.0, level - ((:now_s - last_touched_at) * :leak_rate)) AS level_post FROM pecorino_leaky_buckets WHERE key = :key ) UPDATE pecorino_leaky_buckets SET last_touched_at = :now_s, may_be_deleted_after = DATETIME('now', '+:delete_after_s seconds'), level = CASE WHEN (SELECT level_post_with_uncapped_fillup FROM pre) <= :capacity THEN (SELECT level_post_with_uncapped_fillup FROM pre) ELSE (SELECT level_post FROM pre) END RETURNING (SELECT level_post FROM pre) AS level_before, level AS level_after SQL upserted = @model_class.connection.uncached { @model_class.connection.select_one(sql) } level_after = upserted.fetch("level_after") level_before = upserted.fetch("level_before") [level_after, level_after >= capacity, level_after != level_before] end |
#blocked_until(key:) ⇒ Object
181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/pecorino/adapters/sqlite_adapter.rb', line 181 def blocked_until(key:) now_s = Time.now.to_f block_check_query = @model_class.sanitize_sql_array([<<~SQL, {now_s: now_s, key: key}]) SELECT blocked_until FROM pecorino_blocks WHERE key = :key AND blocked_until >= :now_s LIMIT 1 SQL blocked_until_s = @model_class.connection.uncached { @model_class.connection.select_value(block_check_query) } blocked_until_s && Time.at(blocked_until_s) end |
#create_tables(active_record_schema) ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/pecorino/adapters/sqlite_adapter.rb', line 201 def create_tables(active_record_schema) active_record_schema.create_table :pecorino_leaky_buckets do |t| t.string :key, null: false t.float :level, null: false t.datetime :last_touched_at, null: false t.datetime :may_be_deleted_after, null: false end active_record_schema.add_index :pecorino_leaky_buckets, [:key], unique: true active_record_schema.add_index :pecorino_leaky_buckets, [:may_be_deleted_after] active_record_schema.create_table :pecorino_blocks do |t| t.string :key, null: false t.datetime :blocked_until, null: false end active_record_schema.add_index :pecorino_blocks, [:key], unique: true active_record_schema.add_index :pecorino_blocks, [:blocked_until] end |
#prune ⇒ Object
195 196 197 198 199 |
# File 'lib/pecorino/adapters/sqlite_adapter.rb', line 195 def prune now_s = Time.now.to_f @model_class.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < ?", now_s) @model_class.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < ?", now_s) end |
#set_block(key:, block_for:) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/pecorino/adapters/sqlite_adapter.rb', line 165 def set_block(key:, block_for:) raise ArgumentError, "block_for must be positive" unless block_for > 0 query_params = {key: key.to_s, block_for: block_for.to_f, now_s: Time.now.to_f} block_set_query = @model_class.sanitize_sql_array([<<~SQL, query_params]) INSERT INTO pecorino_blocks AS t (key, blocked_until) VALUES (:key, :now_s + :block_for) ON CONFLICT (key) DO UPDATE SET blocked_until = MAX(EXCLUDED.blocked_until, t.blocked_until) RETURNING blocked_until; SQL blocked_until_s = @model_class.connection.uncached { @model_class.connection.select_value(block_set_query) } Time.at(blocked_until_s) end |
#state(key:, capacity:, leak_rate:) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/pecorino/adapters/sqlite_adapter.rb', line 8 def state(key:, capacity:, leak_rate:) # With a server database, it is really important to use the clock of the database itself so # that concurrent requests will see consistent bucket level calculations. Since SQLite is # actually in-process, there is no point using DB functions - and besides, SQLite reduces # the time precision to the nearest millisecond - and the calculations with timestamps are # obtuse. Therefore we can use the current time inside the Ruby VM - it doesn't matter all that # much but saves us on writing some gnarly SQL to have SQLite produce consistent precise timestamps. query_params = { key: key.to_s, capacity: capacity.to_f, leak_rate: leak_rate.to_f, now_s: Time.now.to_f } # The `level` of the bucket is what got stored at `last_touched_at` time, and we can # extrapolate from it to see how many tokens have leaked out since `last_touched_at` - # we don't need to UPDATE the value in the bucket here sql = @model_class.sanitize_sql_array([<<~SQL, query_params]) SELECT MAX( 0.0, MIN( :capacity, t.level - ((:now_s - t.last_touched_at) * :leak_rate) ) ) FROM pecorino_leaky_buckets AS t WHERE key = :key SQL # If the return value of the query is a NULL it means no such bucket exists, # so we assume the bucket is empty current_level = @model_class.connection.uncached { @model_class.connection.select_value(sql) } || 0.0 [current_level, capacity - current_level.abs < 0.01] end |