Class: A2A::Server::TaskManager
- Inherits:
-
Object
- Object
- A2A::Server::TaskManager
- Defined in:
- lib/a2a/server/task_manager.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#event_handlers ⇒ Object
readonly
Returns the value of attribute event_handlers.
-
#storage ⇒ Object
readonly
Returns the value of attribute storage.
Instance Method Summary collapse
-
#add_artifact(task_id, artifact, append: false) ⇒ A2A::Types::Task
Add an artifact to a task.
-
#add_event_handler(&handler) ⇒ Object
Add an event handler.
-
#add_message(task_id, message) ⇒ A2A::Types::Task
Add a message to task history.
-
#add_task_artifact(task_id, artifact, append: false) ⇒ A2A::Types::Task
Add an artifact to a task (alias for add_artifact for compatibility).
-
#add_to_cache(task_id, task) ⇒ Object
private
Add task to cache with LRU eviction.
-
#cancel_task(task_id, reason: nil) ⇒ A2A::Types::Task
Cancel a task.
-
#clear_cache! ⇒ Object
private
Clear task cache.
-
#create_task(type:, params: {}, context_id: nil, metadata: nil) ⇒ A2A::Types::Task
Create a new task with performance tracking.
-
#default_config ⇒ Hash
private
Default configuration.
-
#emit_artifact_update_event(task, artifact, append) ⇒ Object
private
Emit a task artifact update event.
-
#emit_event(event_type, event_data) ⇒ Object
private
Emit an event to all registered handlers.
-
#emit_status_update_event(task) ⇒ Object
private
Emit a task status update event.
-
#generate_context_id ⇒ String
private
Generate a unique context ID.
-
#generate_task_id ⇒ String
private
Generate a unique task ID.
-
#get_from_cache(task_id) ⇒ A2A::Types::Task?
private
Get task from cache.
-
#get_task(task_id, history_length: nil) ⇒ A2A::Types::Task
Get a task by ID with caching for performance.
-
#initialize(storage: nil, push_notification_manager: nil, config: {}) ⇒ TaskManager
constructor
Initialize a new TaskManager with performance optimizations.
-
#list_tasks_by_context(*args) ⇒ Array<A2A::Types::Task>
List tasks by context ID.
-
#performance_metrics ⇒ Hash
private
Get performance metrics.
-
#record_cache_hit ⇒ Object
private
Record cache hit.
-
#record_cache_miss ⇒ Object
private
Record cache miss.
-
#record_processing_time(duration) ⇒ Object
private
Record processing time.
-
#remove_event_handler(handler) ⇒ Object
Remove an event handler.
-
#reset_performance_metrics! ⇒ Object
private
Reset performance metrics.
-
#update_task_status(task_id, status, message: nil) ⇒ A2A::Types::Task
Update task status with performance tracking.
-
#validate_state_transition(current_state, new_state) ⇒ Object
private
Validate state transition.
Constructor Details
#initialize(storage: nil, push_notification_manager: nil, config: {}) ⇒ TaskManager
Initialize a new TaskManager with performance optimizations
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/a2a/server/task_manager.rb', line 25 def initialize(storage: nil, push_notification_manager: nil, config: {}) @storage = storage || A2A::Server::Storage::Memory.new @push_notification_manager = push_notification_manager @event_handlers = [] @config = default_config.merge(config) @task_cache = {} # LRU cache for frequently accessed tasks @cache_mutex = Mutex.new @performance_metrics = { tasks_created: 0, tasks_updated: 0, cache_hits: 0, cache_misses: 0, avg_processing_time: 0.0 } @metrics_mutex = Mutex.new end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
17 18 19 |
# File 'lib/a2a/server/task_manager.rb', line 17 def config @config end |
#event_handlers ⇒ Object (readonly)
Returns the value of attribute event_handlers.
17 18 19 |
# File 'lib/a2a/server/task_manager.rb', line 17 def event_handlers @event_handlers end |
#storage ⇒ Object (readonly)
Returns the value of attribute storage.
17 18 19 |
# File 'lib/a2a/server/task_manager.rb', line 17 def storage @storage end |
Instance Method Details
#add_artifact(task_id, artifact, append: false) ⇒ A2A::Types::Task
Add an artifact to a task
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/a2a/server/task_manager.rb', line 207 def add_artifact(task_id, artifact, append: false) task = get_task(task_id) if append && task.artifacts # Find existing artifact with same ID existing_artifact = task.artifacts.find { |a| a.artifact_id == artifact.artifact_id } if existing_artifact # Append parts to existing artifact artifact.parts.each { |part| existing_artifact.add_part(part) } else task.add_artifact(artifact) end else task.add_artifact(artifact) end @storage.save_task(task) # Emit artifact update event emit_artifact_update_event(task, artifact, append) task end |
#add_event_handler(&handler) ⇒ Object
Add an event handler
277 278 279 |
# File 'lib/a2a/server/task_manager.rb', line 277 def add_event_handler(&handler) @event_handlers << handler end |
#add_message(task_id, message) ⇒ A2A::Types::Task
Add a message to task history
250 251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/a2a/server/task_manager.rb', line 250 def (task_id, ) task = get_task(task_id) task.() # Limit history length if configured if @config[:max_history_length] && task.history && task.history.length > @config[:max_history_length] # Keep only the most recent messages task.instance_variable_set(:@history, task.history.last(@config[:max_history_length])) end @storage.save_task(task) task end |
#add_task_artifact(task_id, artifact, append: false) ⇒ A2A::Types::Task
Add an artifact to a task (alias for add_artifact for compatibility)
239 240 241 |
# File 'lib/a2a/server/task_manager.rb', line 239 def add_task_artifact(task_id, artifact, append: false) add_artifact(task_id, artifact, append: append) end |
#add_to_cache(task_id, task) ⇒ Object (private)
Add task to cache with LRU eviction
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 |
# File 'lib/a2a/server/task_manager.rb', line 456 def add_to_cache(task_id, task) @cache_mutex.synchronize do # Evict oldest entries if cache is full cache_size = @config[:cache_size] || 1000 if @task_cache.size >= cache_size oldest_key = @task_cache.min_by { |_, entry| entry[:timestamp] }.first @task_cache.delete(oldest_key) end @task_cache[task_id] = { task: task, timestamp: Time.now } end end |
#cancel_task(task_id, reason: nil) ⇒ A2A::Types::Task
Cancel a task
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/a2a/server/task_manager.rb', line 181 def cancel_task(task_id, reason: nil) task = get_task(task_id) unless task.cancelable? raise A2A::Errors::TaskNotCancelable, "Task #{task_id} in state '#{task.status.state}' cannot be canceled" end update_task_status( task_id, A2A::Types::TaskStatus.new( state: A2A::Types::TASK_STATE_CANCELED, message: reason || "Task canceled", updated_at: Time.now.utc.iso8601 ) ) end |
#clear_cache! ⇒ Object (private)
Clear task cache
475 476 477 |
# File 'lib/a2a/server/task_manager.rb', line 475 def clear_cache! @cache_mutex.synchronize { @task_cache.clear } end |
#create_task(type:, params: {}, context_id: nil, metadata: nil) ⇒ A2A::Types::Task
Create a new task with performance tracking
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/a2a/server/task_manager.rb', line 50 def create_task(type:, params: {}, context_id: nil, metadata: nil) A2A::Utils::Performance.profile("task_creation") do task_id = generate_task_id context_id ||= generate_context_id # Record task creation metric @metrics_mutex.synchronize { @performance_metrics[:tasks_created] += 1 } A2A::Monitoring.increment_counter("a2a_tasks_created", task_type: type) if defined?(A2A::Monitoring) A2A::Monitoring.log(:info, "Creating task", task_id: task_id, task_type: type) if defined?(A2A::Monitoring) task = A2A::Types::Task.new( id: task_id, context_id: context_id, status: A2A::Types::TaskStatus.new( state: A2A::Types::TASK_STATE_SUBMITTED, message: "Task created", updated_at: Time.now.utc.iso8601 ), metadata: ( || {}).merge( type: type, params: params, created_at: Time.now.utc.iso8601 ) ) @storage.save_task(task) # Cache the newly created task add_to_cache(task_id, task) # Emit task creation event emit_status_update_event(task) task end end |
#default_config ⇒ Hash (private)
Default configuration
422 423 424 425 426 427 428 |
# File 'lib/a2a/server/task_manager.rb', line 422 def default_config { max_history_length: 100, cache_size: 1000, cache_ttl: 300 # 5 minutes } end |
#emit_artifact_update_event(task, artifact, append) ⇒ Object (private)
Emit a task artifact update event
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 |
# File 'lib/a2a/server/task_manager.rb', line 386 def emit_artifact_update_event(task, artifact, append) event = A2A::Types::TaskArtifactUpdateEvent.new( task_id: task.id, context_id: task.context_id, artifact: artifact, append: append, metadata: { timestamp: Time.now.utc.iso8601, event_id: SecureRandom.uuid } ) emit_event("task_artifact_update", event) # Send push notifications if manager is available @push_notification_manager&.notify_task_artifact_update(event) end |
#emit_event(event_type, event_data) ⇒ Object (private)
Emit an event to all registered handlers
409 410 411 412 413 414 415 416 |
# File 'lib/a2a/server/task_manager.rb', line 409 def emit_event(event_type, event_data) @event_handlers.each do |handler| handler.call(event_type, event_data) rescue StandardError => e # Log error but don't fail the operation warn "Error in event handler: #{e.message}" end end |
#emit_status_update_event(task) ⇒ Object (private)
Emit a task status update event
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/a2a/server/task_manager.rb', line 363 def emit_status_update_event(task) event = A2A::Types::TaskStatusUpdateEvent.new( task_id: task.id, context_id: task.context_id, status: task.status, metadata: { timestamp: Time.now.utc.iso8601, event_id: SecureRandom.uuid } ) emit_event("task_status_update", event) # Send push notifications if manager is available @push_notification_manager&.notify_task_status_update(event) end |
#generate_context_id ⇒ String (private)
Generate a unique context ID
303 304 305 |
# File 'lib/a2a/server/task_manager.rb', line 303 def generate_context_id SecureRandom.uuid end |
#generate_task_id ⇒ String (private)
Generate a unique task ID
295 296 297 |
# File 'lib/a2a/server/task_manager.rb', line 295 def generate_task_id SecureRandom.uuid end |
#get_from_cache(task_id) ⇒ A2A::Types::Task? (private)
Get task from cache
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 |
# File 'lib/a2a/server/task_manager.rb', line 435 def get_from_cache(task_id) @cache_mutex.synchronize do entry = @task_cache[task_id] return nil unless entry # Check TTL cache_ttl = @config[:cache_ttl] || 300 if Time.now - entry[:timestamp] > cache_ttl @task_cache.delete(task_id) return nil end entry[:task] end end |
#get_task(task_id, history_length: nil) ⇒ A2A::Types::Task
Get a task by ID with caching for performance
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/a2a/server/task_manager.rb', line 95 def get_task(task_id, history_length: nil) start_time = Time.now # Check cache first for frequently accessed tasks cached_task = get_from_cache(task_id) if cached_task && (!history_length || !cached_task.history || cached_task.history.length <= history_length) record_cache_hit return cached_task end record_cache_miss task = @storage.get_task(task_id) raise A2A::Errors::TaskNotFound, "Task #{task_id} not found" unless task # Limit history if requested if history_length && task.history && task.history.length > history_length limited_history = task.history.last(history_length) # Create a new task instance with limited history task = A2A::Types::Task.new( id: task.id, context_id: task.context_id, status: task.status, artifacts: task.artifacts, history: limited_history, metadata: task. ) end # Cache the task for future access add_to_cache(task_id, task) record_processing_time(Time.now - start_time) task end |
#list_tasks_by_context(*args) ⇒ Array<A2A::Types::Task>
List tasks by context ID
269 270 271 |
# File 'lib/a2a/server/task_manager.rb', line 269 def list_tasks_by_context(*args) @storage.list_tasks_by_context(*args) end |
#performance_metrics ⇒ Hash (private)
Get performance metrics
514 515 516 |
# File 'lib/a2a/server/task_manager.rb', line 514 def performance_metrics @metrics_mutex.synchronize { @performance_metrics.dup } end |
#record_cache_hit ⇒ Object (private)
Record cache hit
482 483 484 |
# File 'lib/a2a/server/task_manager.rb', line 482 def record_cache_hit @metrics_mutex.synchronize { @performance_metrics[:cache_hits] += 1 } end |
#record_cache_miss ⇒ Object (private)
Record cache miss
489 490 491 |
# File 'lib/a2a/server/task_manager.rb', line 489 def record_cache_miss @metrics_mutex.synchronize { @performance_metrics[:cache_misses] += 1 } end |
#record_processing_time(duration) ⇒ Object (private)
Record processing time
497 498 499 500 501 502 503 504 505 506 507 508 |
# File 'lib/a2a/server/task_manager.rb', line 497 def record_processing_time(duration) @metrics_mutex.synchronize do current_avg = @performance_metrics[:avg_processing_time] total_ops = @performance_metrics[:tasks_created] + @performance_metrics[:tasks_updated] @performance_metrics[:avg_processing_time] = if total_ops.positive? ((current_avg * (total_ops - 1)) + duration) / total_ops else duration end end end |
#remove_event_handler(handler) ⇒ Object
Remove an event handler
285 286 287 |
# File 'lib/a2a/server/task_manager.rb', line 285 def remove_event_handler(handler) @event_handlers.delete(handler) end |
#reset_performance_metrics! ⇒ Object (private)
Reset performance metrics
521 522 523 524 525 526 527 528 529 530 531 |
# File 'lib/a2a/server/task_manager.rb', line 521 def reset_performance_metrics! @metrics_mutex.synchronize do @performance_metrics = { tasks_created: 0, tasks_updated: 0, cache_hits: 0, cache_misses: 0, avg_processing_time: 0.0 } end end |
#update_task_status(task_id, status, message: nil) ⇒ A2A::Types::Task
Update task status with performance tracking
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/a2a/server/task_manager.rb', line 138 def update_task_status(task_id, status, message: nil) A2A::Utils::Performance.profile("task_status_update") do task = get_task(task_id) # Validate state transition new_state = status.is_a?(A2A::Types::TaskStatus) ? status.state : status[:state] || status["state"] validate_state_transition(task.status.state, new_state) # Record task update metric @metrics_mutex.synchronize { @performance_metrics[:tasks_updated] += 1 } # Create new status new_status = if status.is_a?(A2A::Types::TaskStatus) status else status_hash = status.dup status_hash[:message] = if status_hash[:updated_at] = Time.now.utc.iso8601 A2A::Types::TaskStatus.new(**status_hash) end # Update task task.update_status(new_status) @storage.save_task(task) # Update cache add_to_cache(task_id, task) # Emit status update event emit_status_update_event(task) task end end |
#validate_state_transition(current_state, new_state) ⇒ Object (private)
Validate state transition
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 |
# File 'lib/a2a/server/task_manager.rb', line 313 def validate_state_transition(current_state, new_state) # Define valid state transitions valid_transitions = { A2A::Types::TASK_STATE_SUBMITTED => [ A2A::Types::TASK_STATE_WORKING, A2A::Types::TASK_STATE_CANCELED, A2A::Types::TASK_STATE_REJECTED, A2A::Types::TASK_STATE_AUTH_REQUIRED ], A2A::Types::TASK_STATE_WORKING => [ A2A::Types::TASK_STATE_INPUT_REQUIRED, A2A::Types::TASK_STATE_COMPLETED, A2A::Types::TASK_STATE_CANCELED, A2A::Types::TASK_STATE_FAILED, A2A::Types::TASK_STATE_AUTH_REQUIRED ], A2A::Types::TASK_STATE_INPUT_REQUIRED => [ A2A::Types::TASK_STATE_WORKING, A2A::Types::TASK_STATE_COMPLETED, A2A::Types::TASK_STATE_CANCELED, A2A::Types::TASK_STATE_FAILED ], A2A::Types::TASK_STATE_AUTH_REQUIRED => [ A2A::Types::TASK_STATE_WORKING, A2A::Types::TASK_STATE_CANCELED, A2A::Types::TASK_STATE_REJECTED ] } # Terminal states cannot transition terminal_states = [ A2A::Types::TASK_STATE_COMPLETED, A2A::Types::TASK_STATE_CANCELED, A2A::Types::TASK_STATE_FAILED, A2A::Types::TASK_STATE_REJECTED, A2A::Types::TASK_STATE_UNKNOWN ] raise ArgumentError, "Cannot transition from terminal state '#{current_state}'" if terminal_states.include?(current_state) allowed_states = valid_transitions[current_state] || [] return if allowed_states.include?(new_state) raise ArgumentError, "Invalid state transition from '#{current_state}' to '#{new_state}'" end |