Class: A2A::Server::Storage::Redis
- Defined in:
- lib/a2a/server/storage/redis.rb
Constant Summary collapse
- TASK_KEY_PREFIX =
Redis key prefixes
"a2a:task:"- CONTEXT_KEY_PREFIX =
"a2a:context:"- TASK_LIST_KEY =
"a2a:tasks:all"
Instance Method Summary collapse
- #apply_metadata_filters(tasks, filters) ⇒ Object private
- #apply_single_filter(task, key, value) ⇒ Object private
- #apply_state_filter(tasks, filters) ⇒ Object private
-
#build_context_key(context_id) ⇒ String
Build a Redis key for a context set.
-
#build_task_key(task_id) ⇒ String
Build a Redis key for a task.
-
#clear_all_tasks ⇒ void
Clear all tasks.
-
#connected? ⇒ Boolean
Check Redis connection.
-
#delete_task(task_id) ⇒ Boolean
Delete a task by ID.
-
#deserialize_task(task_data) ⇒ A2A::Types::Task
Deserialize a task from JSON.
-
#flush_all_a2a_data! ⇒ void
Flush all A2A data from Redis (dangerous!).
- #get_base_tasks(filters) ⇒ Object private
- #get_created_at(task) ⇒ Object private
-
#get_task(task_id) ⇒ A2A::Types::Task?
Get a task by ID.
-
#info ⇒ Hash
Get Redis info.
-
#initialize(redis: nil, url: nil, namespace: "a2a", ttl: nil) ⇒ Redis
constructor
Initialize the Redis storage.
-
#list_all_tasks ⇒ Array<A2A::Types::Task>
List all tasks.
-
#list_tasks(**filters) ⇒ Array<A2A::Types::Task>
List tasks with optional filtering.
-
#list_tasks_by_context(context_id) ⇒ Array<A2A::Types::Task>
List all tasks for a given context ID.
-
#save_task(task) ⇒ void
Save a task to Redis.
-
#serialize_task(task) ⇒ String
Serialize a task to JSON for Redis storage.
-
#task_count ⇒ Integer
Get the number of stored tasks.
Methods inherited from Base
#delete_push_notification_config, #get_push_notification_config_by_id, #list_push_notification_configs, #save_push_notification_config, #task_exists?
Constructor Details
#initialize(redis: nil, url: nil, namespace: "a2a", ttl: nil) ⇒ Redis
Initialize the Redis storage
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/a2a/server/storage/redis.rb', line 34 def initialize(redis: nil, url: nil, namespace: "a2a", ttl: nil) unless defined?(::Redis) raise LoadError, "Redis gem is required for Redis storage. Add 'redis' to your Gemfile." end @redis = redis || ::Redis.new(url: url || ENV["REDIS_URL"] || "redis://localhost:6379") @namespace = namespace @ttl = ttl end |
Instance Method Details
#apply_metadata_filters(tasks, filters) ⇒ Object (private)
289 290 291 292 293 294 295 296 |
# File 'lib/a2a/server/storage/redis.rb', line 289 def (tasks, filters) filters.each do |key, value| next if i[state context_id].include?(key) tasks = tasks.select { |task| apply_single_filter(task, key, value) } end tasks end |
#apply_single_filter(task, key, value) ⇒ Object (private)
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 |
# File 'lib/a2a/server/storage/redis.rb', line 298 def apply_single_filter(task, key, value) case key when :task_type task.&.dig(:type) == value || task.&.dig("type") == value when :created_after created_at = get_created_at(task) created_at && Time.parse(created_at) > value when :created_before created_at = get_created_at(task) created_at && Time.parse(created_at) < value else # Generic metadata filter task.&.dig(key) == value || task.&.dig(key.to_s) == value end end |
#apply_state_filter(tasks, filters) ⇒ Object (private)
283 284 285 286 287 |
# File 'lib/a2a/server/storage/redis.rb', line 283 def apply_state_filter(tasks, filters) return tasks unless filters[:state] tasks.select { |task| task.status&.state == filters[:state] } end |
#build_context_key(context_id) ⇒ String
Build a Redis key for a context set
250 251 252 |
# File 'lib/a2a/server/storage/redis.rb', line 250 def build_context_key(context_id) "#{@namespace}:#{CONTEXT_KEY_PREFIX}#{context_id}" end |
#build_task_key(task_id) ⇒ String
Build a Redis key for a task
241 242 243 |
# File 'lib/a2a/server/storage/redis.rb', line 241 def build_task_key(task_id) "#{@namespace}:#{TASK_KEY_PREFIX}#{task_id}" end |
#clear_all_tasks ⇒ void
This method returns an undefined value.
Clear all tasks
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/a2a/server/storage/redis.rb', line 168 def clear_all_tasks # Get all task IDs task_ids = @redis.smembers(TASK_LIST_KEY) return if task_ids.empty? # Build all keys to delete task_keys = task_ids.map { |id| build_task_key(id) } # Get all context IDs to clean up context sets tasks = task_keys.filter_map { |key| @redis.get(key) }.map { |data| deserialize_task(data) } context_keys = tasks.map { |task| build_context_key(task.context_id) }.uniq # Delete everything in a transaction @redis.multi do |multi| # Delete all task data multi.del(*task_keys) unless task_keys.empty? # Delete all context sets multi.del(*context_keys) unless context_keys.empty? # Clear the global task list multi.del(TASK_LIST_KEY) end end |
#connected? ⇒ Boolean
Check Redis connection
206 207 208 209 210 |
# File 'lib/a2a/server/storage/redis.rb', line 206 def connected? @redis.ping == "PONG" rescue ::Redis::BaseError false end |
#delete_task(task_id) ⇒ Boolean
Delete a task by ID
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/a2a/server/storage/redis.rb', line 92 def delete_task(task_id) task = get_task(task_id) return false unless task task_key = build_task_key(task_id) context_key = build_context_key(task.context_id) @redis.multi do |multi| # Remove task data multi.del(task_key) # Remove from context set multi.srem(context_key, task_id) # Remove from global task list multi.srem(TASK_LIST_KEY, task_id) end true end |
#deserialize_task(task_data) ⇒ A2A::Types::Task
Deserialize a task from JSON
268 269 270 271 |
# File 'lib/a2a/server/storage/redis.rb', line 268 def deserialize_task(task_data) data = JSON.parse(task_data) A2A::Types::Task.from_h(data) end |
#flush_all_a2a_data! ⇒ void
This method returns an undefined value.
Flush all A2A data from Redis (dangerous!)
This removes all A2A-related keys from Redis. Use with caution in production.
227 228 229 230 231 232 233 234 |
# File 'lib/a2a/server/storage/redis.rb', line 227 def flush_all_a2a_data! pattern = "#{@namespace}:*" keys = @redis.keys(pattern) return if keys.empty? @redis.del(*keys) end |
#get_base_tasks(filters) ⇒ Object (private)
275 276 277 278 279 280 281 |
# File 'lib/a2a/server/storage/redis.rb', line 275 def get_base_tasks(filters) if filters[:context_id] list_tasks_by_context(filters[:context_id]) else list_all_tasks end end |
#get_created_at(task) ⇒ Object (private)
314 315 316 |
# File 'lib/a2a/server/storage/redis.rb', line 314 def get_created_at(task) task.&.dig(:created_at) || task.&.dig("created_at") end |
#get_task(task_id) ⇒ A2A::Types::Task?
Get a task by ID
78 79 80 81 82 83 84 85 |
# File 'lib/a2a/server/storage/redis.rb', line 78 def get_task(task_id) task_key = build_task_key(task_id) task_data = @redis.get(task_key) return nil unless task_data deserialize_task(task_data) end |
#info ⇒ Hash
Get Redis info
216 217 218 |
# File 'lib/a2a/server/storage/redis.rb', line 216 def info @redis.info end |
#list_all_tasks ⇒ Array<A2A::Types::Task>
List all tasks
138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/a2a/server/storage/redis.rb', line 138 def list_all_tasks task_ids = @redis.smembers(TASK_LIST_KEY) return [] if task_ids.empty? # Get all tasks in a single pipeline task_keys = task_ids.map { |id| build_task_key(id) } task_data_list = @redis.mget(*task_keys) tasks = task_data_list.compact.map { |data| deserialize_task(data) } # Sort by creation time (from metadata) tasks.sort_by { |task| task.&.dig("created_at") || "" } end |
#list_tasks(**filters) ⇒ Array<A2A::Types::Task>
List tasks with optional filtering
158 159 160 161 162 |
# File 'lib/a2a/server/storage/redis.rb', line 158 def list_tasks(**filters) tasks = get_base_tasks(filters) tasks = apply_state_filter(tasks, filters) (tasks, filters) end |
#list_tasks_by_context(context_id) ⇒ Array<A2A::Types::Task>
List all tasks for a given context ID
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/a2a/server/storage/redis.rb', line 118 def list_tasks_by_context(context_id) context_key = build_context_key(context_id) task_ids = @redis.smembers(context_key) return [] if task_ids.empty? # Get all tasks in a single pipeline task_keys = task_ids.map { |id| build_task_key(id) } task_data_list = @redis.mget(*task_keys) tasks = task_data_list.compact.map { |data| deserialize_task(data) } # Sort by creation time (from metadata) tasks.sort_by { |task| task.&.dig("created_at") || "" } end |
#save_task(task) ⇒ void
This method returns an undefined value.
Save a task to Redis
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/a2a/server/storage/redis.rb', line 50 def save_task(task) task_key = build_task_key(task.id) context_key = build_context_key(task.context_id) task_data = serialize_task(task) @redis.multi do |multi| # Store the task data multi.set(task_key, task_data) # Add to context set multi.sadd(context_key, task.id) # Add to global task list multi.sadd(TASK_LIST_KEY, task.id) # Set TTL if configured if @ttl multi.expire(task_key, @ttl) multi.expire(context_key, @ttl) end end end |
#serialize_task(task) ⇒ String
Serialize a task to JSON for Redis storage
259 260 261 |
# File 'lib/a2a/server/storage/redis.rb', line 259 def serialize_task(task) JSON.generate(task.to_h) end |
#task_count ⇒ Integer
Get the number of stored tasks
198 199 200 |
# File 'lib/a2a/server/storage/redis.rb', line 198 def task_count @redis.scard(TASK_LIST_KEY) end |