Class: A2A::Server::Storage::Database

Inherits:
Base
  • Object
show all
Defined in:
lib/a2a/server/storage/database.rb

Defined Under Namespace

Classes: TaskRecord

Instance Method Summary collapse

Methods inherited from Base

#delete_push_notification_config, #get_push_notification_config_by_id, #list_push_notification_configs, #save_push_notification_config, #task_exists?

Constructor Details

#initialize(connection: nil) ⇒ Database

Initialize the database storage

Parameters:

  • (defaults to: nil)

    Optional AR connection

Raises:

  • If ActiveRecord is not available



54
55
56
57
58
59
60
61
# File 'lib/a2a/server/storage/database.rb', line 54

def initialize(connection: nil)
  unless defined?(ActiveRecord)
    raise LoadError, "ActiveRecord is required for database storage. Add 'activerecord' to your Gemfile."
  end

  @connection = connection || ActiveRecord::Base
  ensure_table_exists!
end

Instance Method Details

#apply_metadata_filters(tasks, filters) ⇒ Object (private)



234
235
236
237
238
239
240
241
# File 'lib/a2a/server/storage/database.rb', line 234

def (tasks, filters)
  filters.each do |key, value|
    next if %i[state context_id].include?(key)

    tasks = tasks.select { |task| apply_single_filter(task, key, value) }
  end
  tasks
end

#apply_post_query_filters(tasks, filters) ⇒ Object (private)



229
230
231
232
# File 'lib/a2a/server/storage/database.rb', line 229

def apply_post_query_filters(tasks, filters)
  tasks = tasks.select { |task| task.status&.state == filters[:state] } if filters[:state]
  (tasks, filters)
end

#apply_single_filter(task, key, value) ⇒ Object (private)



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/a2a/server/storage/database.rb', line 243

def apply_single_filter(task, key, value)
  case key
  when :task_type
    task.&.dig(:type) == value || task.&.dig("type") == value
  when :created_after
    created_at = get_created_at(task)
    created_at && Time.parse(created_at) > value
  when :created_before
    created_at = get_created_at(task)
    created_at && Time.parse(created_at) < value
  else
    # Generic metadata filter
    task.&.dig(key) == value || task.&.dig(key.to_s) == value
  end
end

#build_base_query(filters) ⇒ Object (private)



223
224
225
226
227
# File 'lib/a2a/server/storage/database.rb', line 223

def build_base_query(filters)
  query = TaskRecord.order(:created_at)
  query = query.where(context_id: filters[:context_id]) if filters[:context_id]
  query
end

#clear_all_tasksvoid

This method returns an undefined value.

Clear all tasks



140
141
142
# File 'lib/a2a/server/storage/database.rb', line 140

def clear_all_tasks
  TaskRecord.delete_all
end

#create_table!void

This method returns an undefined value.

Create the tasks table if it doesn't exist

This is a convenience method for development/testing. In production, use proper migrations.



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/a2a/server/storage/database.rb', line 159

def create_table!
  return if table_exists?

  @connection.connection.create_table :a2a_tasks, force: true do |t|
    t.string :task_id, null: false, limit: 255
    t.string :context_id, null: false, limit: 255
    t.text :task_data, null: false
    t.timestamps null: false

    t.index :task_id, unique: true
    t.index :context_id
    t.index :created_at
    t.index :updated_at
  end
end

#delete_task(task_id) ⇒ Boolean

Delete a task by ID

Parameters:

  • The task ID

Returns:

  • True if task was deleted, false if not found



100
101
102
103
# File 'lib/a2a/server/storage/database.rb', line 100

def delete_task(task_id)
  deleted_count = TaskRecord.where(task_id: task_id).delete_all
  deleted_count.positive?
end

#deserialize_task(task_data) ⇒ A2A::Types::Task

Deserialize a task from database storage

Parameters:

  • Serialized task data

Returns:

  • The deserialized task



217
218
219
# File 'lib/a2a/server/storage/database.rb', line 217

def deserialize_task(task_data)
  A2A::Types::Task.from_h(task_data)
end

#drop_table!void

This method returns an undefined value.

Drop the tasks table



179
180
181
# File 'lib/a2a/server/storage/database.rb', line 179

def drop_table!
  @connection.connection.drop_table :a2a_tasks if table_exists?
end

#ensure_table_exists!void

This method returns an undefined value.

Ensure the tasks table exists



195
196
197
198
199
200
201
# File 'lib/a2a/server/storage/database.rb', line 195

def ensure_table_exists!
  return if table_exists?

  warn "A2A tasks table does not exist. Creating it automatically."
  warn "In production, you should create this table using a proper migration."
  create_table!
end

#get_created_at(task) ⇒ Object (private)



259
260
261
# File 'lib/a2a/server/storage/database.rb', line 259

def get_created_at(task)
  task.&.dig(:created_at) || task.&.dig("created_at")
end

#get_task(task_id) ⇒ A2A::Types::Task?

Get a task by ID

Parameters:

  • The task ID

Returns:

  • The task or nil if not found



88
89
90
91
92
93
# File 'lib/a2a/server/storage/database.rb', line 88

def get_task(task_id)
  record = TaskRecord.find_by(task_id: task_id)
  return nil unless record

  deserialize_task(record.task_data)
end

#list_all_tasksArray<A2A::Types::Task>

List all tasks

Returns:

  • All tasks



119
120
121
122
# File 'lib/a2a/server/storage/database.rb', line 119

def list_all_tasks
  records = TaskRecord.order(:created_at)
  records.map { |record| deserialize_task(record.task_data) }
end

#list_tasks(**filters) ⇒ Array<A2A::Types::Task>

List tasks with optional filtering

Parameters:

  • Optional filters (state, context_id, etc.)

Returns:

  • Filtered tasks



129
130
131
132
133
134
# File 'lib/a2a/server/storage/database.rb', line 129

def list_tasks(**filters)
  query = build_base_query(filters)
  tasks = query.map { |record| deserialize_task(record.task_data) }

  apply_post_query_filters(tasks, filters)
end

#list_tasks_by_context(context_id) ⇒ Array<A2A::Types::Task>

List all tasks for a given context ID

Parameters:

  • The context ID

Returns:

  • Tasks in the context



110
111
112
113
# File 'lib/a2a/server/storage/database.rb', line 110

def list_tasks_by_context(context_id)
  records = TaskRecord.where(context_id: context_id).order(:created_at)
  records.map { |record| deserialize_task(record.task_data) }
end

#save_task(task) ⇒ void

This method returns an undefined value.

Save a task to the database

Parameters:

  • The task to save



68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/a2a/server/storage/database.rb', line 68

def save_task(task)
  task_data = serialize_task(task)

  record = TaskRecord.find_or_initialize_by(task_id: task.id)
  record.assign_attributes(
    context_id: task.context_id,
    task_data: task_data,
    updated_at: Time.now
  )

  record.created_at = Time.now if record.new_record?

  record.save!
end

#serialize_task(task) ⇒ Hash

Serialize a task to a hash for database storage

Parameters:

  • The task to serialize

Returns:

  • Serialized task data



208
209
210
# File 'lib/a2a/server/storage/database.rb', line 208

def serialize_task(task)
  task.to_h
end

#table_exists?Boolean

Check if the tasks table exists

Returns:

  • True if table exists



187
188
189
# File 'lib/a2a/server/storage/database.rb', line 187

def table_exists?
  @connection.connection.table_exists?(:a2a_tasks)
end

#task_countInteger

Get the number of stored tasks

Returns:

  • Number of tasks



148
149
150
# File 'lib/a2a/server/storage/database.rb', line 148

def task_count
  TaskRecord.count
end