Class: Pecorino::Adapters::PostgresAdapter
- Inherits:
-
Object
- Object
- Pecorino::Adapters::PostgresAdapter
- Defined in:
- lib/pecorino/adapters/postgres_adapter.rb
Instance Method Summary collapse
- #add_tokens(key:, capacity:, leak_rate:, n_tokens:) ⇒ Object
- #add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) ⇒ Object
- #blocked_until(key:) ⇒ Object
- #create_tables(active_record_schema) ⇒ Object
-
#initialize(model_class) ⇒ PostgresAdapter
constructor
A new instance of PostgresAdapter.
- #prune ⇒ Object
- #set_block(key:, block_for:) ⇒ Object
- #state(key:, capacity:, leak_rate:) ⇒ Object
Constructor Details
#initialize(model_class) ⇒ PostgresAdapter
Returns a new instance of PostgresAdapter.
4 5 6 |
# File 'lib/pecorino/adapters/postgres_adapter.rb', line 4 def initialize(model_class) @model_class = model_class end |
Instance Method Details
#add_tokens(key:, capacity:, leak_rate:, n_tokens:) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/pecorino/adapters/postgres_adapter.rb', line 37 def add_tokens(key:, capacity:, leak_rate:, n_tokens:) # Take double the time it takes the bucket to empty under normal circumstances # until the bucket may be deleted. may_be_deleted_after_seconds = (capacity.to_f / leak_rate.to_f) * 2.0 # Create the leaky bucket if it does not exist, and update # to the new level, taking the leak rate into account - if the bucket exists. query_params = { key: key.to_s, capacity: capacity.to_f, delete_after_s: may_be_deleted_after_seconds, leak_rate: leak_rate.to_f, fillup: n_tokens.to_f } sql = @model_class.sanitize_sql_array([<<~SQL, query_params]) INSERT INTO pecorino_leaky_buckets AS t (key, last_touched_at, may_be_deleted_after, level) VALUES ( :key, clock_timestamp(), clock_timestamp() + ':delete_after_s second'::interval, GREATEST(0.0, LEAST( :capacity, :fillup ) ) ) ON CONFLICT (key) DO UPDATE SET last_touched_at = EXCLUDED.last_touched_at, may_be_deleted_after = EXCLUDED.may_be_deleted_after, level = GREATEST(0.0, LEAST( :capacity, t.level + :fillup - (EXTRACT(EPOCH FROM (EXCLUDED.last_touched_at - t.last_touched_at)) * :leak_rate) ) ) RETURNING level, -- Compare level to the capacity inside the DB so that we won't have rounding issues level >= :capacity AS at_capacity SQL # Note the use of .uncached here. The AR query cache will actually see our # query as a repeat (since we use "select_one" for the RETURNING bit) and will not call into Postgres # correctly, thus the clock_timestamp() value would be frozen between calls. We don't want that here. # See https://stackoverflow.com/questions/73184531/why-would-postgres-clock-timestamp-freeze-inside-a-rails-unit-test upserted = @model_class.connection.uncached { @model_class.connection.select_one(sql) } capped_level_after_fillup, at_capacity = upserted.fetch("level"), upserted.fetch("at_capacity") [capped_level_after_fillup, at_capacity] end |
#add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/pecorino/adapters/postgres_adapter.rb', line 91 def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:) # Take double the time it takes the bucket to empty under normal circumstances # until the bucket may be deleted. may_be_deleted_after_seconds = (capacity.to_f / leak_rate.to_f) * 2.0 # Create the leaky bucket if it does not exist, and update # to the new level, taking the leak rate into account - if the bucket exists. query_params = { key: key.to_s, capacity: capacity.to_f, delete_after_s: may_be_deleted_after_seconds, leak_rate: leak_rate.to_f, fillup: n_tokens.to_f } sql = @model_class.sanitize_sql_array([<<~SQL, query_params]) WITH pre AS MATERIALIZED ( SELECT -- Note the double clamping here. First we clamp the "current level - leak" to not go below zero, -- then we also clamp the above + fillup to not go below 0 GREATEST(0.0, GREATEST(0.0, level - (EXTRACT(EPOCH FROM (clock_timestamp() - last_touched_at)) * :leak_rate)) + :fillup ) AS level_post_with_uncapped_fillup, GREATEST(0.0, level - (EXTRACT(EPOCH FROM (clock_timestamp() - last_touched_at)) * :leak_rate) ) AS level_post FROM pecorino_leaky_buckets WHERE key = :key ) INSERT INTO pecorino_leaky_buckets AS t (key, last_touched_at, may_be_deleted_after, level) VALUES ( :key, clock_timestamp(), clock_timestamp() + ':delete_after_s second'::interval, GREATEST(0.0, (CASE WHEN :fillup > :capacity THEN 0.0 ELSE :fillup END) ) ) ON CONFLICT (key) DO UPDATE SET last_touched_at = EXCLUDED.last_touched_at, may_be_deleted_after = EXCLUDED.may_be_deleted_after, level = CASE WHEN (SELECT level_post_with_uncapped_fillup FROM pre) <= :capacity THEN (SELECT level_post_with_uncapped_fillup FROM pre) ELSE (SELECT level_post FROM pre) END RETURNING COALESCE((SELECT level_post FROM pre), 0.0) AS level_before, level AS level_after SQL upserted = @model_class.connection.uncached { @model_class.connection.select_one(sql) } level_after = upserted.fetch("level_after") level_before = upserted.fetch("level_before") [level_after, level_after >= capacity, level_after != level_before] end |
#blocked_until(key:) ⇒ Object
165 166 167 168 169 170 |
# File 'lib/pecorino/adapters/postgres_adapter.rb', line 165 def blocked_until(key:) block_check_query = @model_class.sanitize_sql_array([<<~SQL, key]) SELECT blocked_until FROM pecorino_blocks WHERE key = ? AND blocked_until >= clock_timestamp() LIMIT 1 SQL @model_class.connection.uncached { @model_class.connection.select_value(block_check_query) } end |
#create_tables(active_record_schema) ⇒ Object
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/pecorino/adapters/postgres_adapter.rb', line 177 def create_tables(active_record_schema) active_record_schema.create_table :pecorino_leaky_buckets, id: :uuid do |t| t.string :key, null: false t.float :level, null: false t.datetime :last_touched_at, null: false t.datetime :may_be_deleted_after, null: false end active_record_schema.add_index :pecorino_leaky_buckets, [:key], unique: true active_record_schema.add_index :pecorino_leaky_buckets, [:may_be_deleted_after] active_record_schema.create_table :pecorino_blocks, id: :uuid do |t| t.string :key, null: false t.datetime :blocked_until, null: false end active_record_schema.add_index :pecorino_blocks, [:key], unique: true active_record_schema.add_index :pecorino_blocks, [:blocked_until] end |
#prune ⇒ Object
172 173 174 175 |
# File 'lib/pecorino/adapters/postgres_adapter.rb', line 172 def prune @model_class.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < NOW()") @model_class.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < NOW()") end |
#set_block(key:, block_for:) ⇒ Object
150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/pecorino/adapters/postgres_adapter.rb', line 150 def set_block(key:, block_for:) raise ArgumentError, "block_for must be positive" unless block_for > 0 query_params = {key: key.to_s, block_for: block_for.to_f} block_set_query = @model_class.sanitize_sql_array([<<~SQL, query_params]) INSERT INTO pecorino_blocks AS t (key, blocked_until) VALUES (:key, clock_timestamp() + ':block_for seconds'::interval) ON CONFLICT (key) DO UPDATE SET blocked_until = GREATEST(EXCLUDED.blocked_until, t.blocked_until) RETURNING blocked_until SQL @model_class.connection.uncached { @model_class.connection.select_value(block_set_query) } end |
#state(key:, capacity:, leak_rate:) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/pecorino/adapters/postgres_adapter.rb', line 8 def state(key:, capacity:, leak_rate:) query_params = { key: key.to_s, capacity: capacity.to_f, leak_rate: leak_rate.to_f } # The `level` of the bucket is what got stored at `last_touched_at` time, and we can # extrapolate from it to see how many tokens have leaked out since `last_touched_at` - # we don't need to UPDATE the value in the bucket here sql = @model_class.sanitize_sql_array([<<~SQL, query_params]) SELECT GREATEST( 0.0, LEAST( :capacity, t.level - (EXTRACT(EPOCH FROM (clock_timestamp() - t.last_touched_at)) * :leak_rate) ) ) FROM pecorino_leaky_buckets AS t WHERE key = :key SQL # If the return value of the query is a NULL it means no such bucket exists, # so we assume the bucket is empty current_level = @model_class.connection.uncached { @model_class.connection.select_value(sql) } || 0.0 [current_level, capacity - current_level.abs < 0.01] end |