Class: A2A::Server::TaskManager

Inherits:
Object
  • Object
show all
Defined in:
lib/a2a/server/task_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(storage: nil, push_notification_manager: nil, config: {}) ⇒ TaskManager

Initialize a new TaskManager with performance optimizations

Parameters:

  • storage (Object) (defaults to: nil)

    Storage backend for task persistence

  • push_notification_manager (PushNotificationManager, nil) (defaults to: nil)

    Push notification manager

  • config (Hash) (defaults to: {})

    Configuration options



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/a2a/server/task_manager.rb', line 25

def initialize(storage: nil, push_notification_manager: nil, config: {})
  @storage = storage || A2A::Server::Storage::Memory.new
  @push_notification_manager = push_notification_manager
  @event_handlers = []
  @config = default_config.merge(config)
  @task_cache = {} # LRU cache for frequently accessed tasks
  @cache_mutex = Mutex.new
  @performance_metrics = {
    tasks_created: 0,
    tasks_updated: 0,
    cache_hits: 0,
    cache_misses: 0,
    avg_processing_time: 0.0
  }
  @metrics_mutex = Mutex.new
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



17
18
19
# File 'lib/a2a/server/task_manager.rb', line 17

def config
  @config
end

#event_handlersObject (readonly)

Returns the value of attribute event_handlers.



17
18
19
# File 'lib/a2a/server/task_manager.rb', line 17

def event_handlers
  @event_handlers
end

#storageObject (readonly)

Returns the value of attribute storage.



17
18
19
# File 'lib/a2a/server/task_manager.rb', line 17

def storage
  @storage
end

Instance Method Details

#add_artifact(task_id, artifact, append: false) ⇒ A2A::Types::Task

Add an artifact to a task

Parameters:

  • task_id (String)

    The task ID

  • artifact (A2A::Types::Artifact)

    The artifact to add

  • append (Boolean) (defaults to: false)

    Whether to append to existing artifact with same ID

Returns:

Raises:



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/a2a/server/task_manager.rb', line 207

def add_artifact(task_id, artifact, append: false)
  task = get_task(task_id)

  if append && task.artifacts
    # Find existing artifact with same ID
    existing_artifact = task.artifacts.find { |a| a.artifact_id == artifact.artifact_id }
    if existing_artifact
      # Append parts to existing artifact
      artifact.parts.each { |part| existing_artifact.add_part(part) }
    else
      task.add_artifact(artifact)
    end
  else
    task.add_artifact(artifact)
  end

  @storage.save_task(task)

  # Emit artifact update event
  emit_artifact_update_event(task, artifact, append)

  task
end

#add_event_handler(&handler) ⇒ Object

Add an event handler

Parameters:

  • handler (Proc)

    Event handler that receives (event_type, event_data)



277
278
279
# File 'lib/a2a/server/task_manager.rb', line 277

def add_event_handler(&handler)
  @event_handlers << handler
end

#add_message(task_id, message) ⇒ A2A::Types::Task

Add a message to task history

Parameters:

Returns:

Raises:



250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/a2a/server/task_manager.rb', line 250

def add_message(task_id, message)
  task = get_task(task_id)
  task.add_message(message)

  # Limit history length if configured
  if @config[:max_history_length] && task.history && task.history.length > @config[:max_history_length]
    # Keep only the most recent messages
    task.instance_variable_set(:@history, task.history.last(@config[:max_history_length]))
  end

  @storage.save_task(task)
  task
end

#add_task_artifact(task_id, artifact, append: false) ⇒ A2A::Types::Task

Add an artifact to a task (alias for add_artifact for compatibility)

Parameters:

  • task_id (String)

    The task ID

  • artifact (A2A::Types::Artifact)

    The artifact to add

  • append (Boolean) (defaults to: false)

    Whether to append to existing artifact with same ID

Returns:

Raises:



239
240
241
# File 'lib/a2a/server/task_manager.rb', line 239

def add_task_artifact(task_id, artifact, append: false)
  add_artifact(task_id, artifact, append: append)
end

#add_to_cache(task_id, task) ⇒ Object (private)

Add task to cache with LRU eviction

Parameters:

  • task_id (String)

    The task ID

  • task (A2A::Types::Task)

    The task to cache



456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
# File 'lib/a2a/server/task_manager.rb', line 456

def add_to_cache(task_id, task)
  @cache_mutex.synchronize do
    # Evict oldest entries if cache is full
    cache_size = @config[:cache_size] || 1000
    if @task_cache.size >= cache_size
      oldest_key = @task_cache.min_by { |_, entry| entry[:timestamp] }.first
      @task_cache.delete(oldest_key)
    end

    @task_cache[task_id] = {
      task: task,
      timestamp: Time.now
    }
  end
end

#cancel_task(task_id, reason: nil) ⇒ A2A::Types::Task

Cancel a task

Parameters:

  • task_id (String)

    The task ID

  • reason (String, nil) (defaults to: nil)

    Optional cancellation reason

Returns:

Raises:



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/a2a/server/task_manager.rb', line 181

def cancel_task(task_id, reason: nil)
  task = get_task(task_id)

  unless task.cancelable?
    raise A2A::Errors::TaskNotCancelable,
          "Task #{task_id} in state '#{task.status.state}' cannot be canceled"
  end

  update_task_status(
    task_id,
    A2A::Types::TaskStatus.new(
      state: A2A::Types::TASK_STATE_CANCELED,
      message: reason || "Task canceled",
      updated_at: Time.now.utc.iso8601
    )
  )
end

#clear_cache!Object (private)

Clear task cache



475
476
477
# File 'lib/a2a/server/task_manager.rb', line 475

def clear_cache!
  @cache_mutex.synchronize { @task_cache.clear }
end

#create_task(type:, params: {}, context_id: nil, metadata: nil) ⇒ A2A::Types::Task

Create a new task with performance tracking

Parameters:

  • type (String)

    Task type identifier

  • params (Hash) (defaults to: {})

    Task parameters

  • context_id (String, nil) (defaults to: nil)

    Optional context ID (generated if not provided)

  • metadata (Hash, nil) (defaults to: nil)

    Optional task metadata

Returns:



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/a2a/server/task_manager.rb', line 50

def create_task(type:, params: {}, context_id: nil, metadata: nil)
  A2A::Utils::Performance.profile("task_creation") do
    task_id = generate_task_id
    context_id ||= generate_context_id

    # Record task creation metric
    @metrics_mutex.synchronize { @performance_metrics[:tasks_created] += 1 }

    A2A::Monitoring.increment_counter("a2a_tasks_created", task_type: type) if defined?(A2A::Monitoring)
    A2A::Monitoring.log(:info, "Creating task", task_id: task_id, task_type: type) if defined?(A2A::Monitoring)

    task = A2A::Types::Task.new(
      id: task_id,
      context_id: context_id,
      status: A2A::Types::TaskStatus.new(
        state: A2A::Types::,
        message: "Task created",
        updated_at: Time.now.utc.iso8601
      ),
      metadata: ( || {}).merge(
        type: type,
        params: params,
        created_at: Time.now.utc.iso8601
      )
    )

    @storage.save_task(task)

    # Cache the newly created task
    add_to_cache(task_id, task)

    # Emit task creation event
    emit_status_update_event(task)

    task
  end
end

#default_configHash (private)

Default configuration

Returns:

  • (Hash)

    Default configuration



422
423
424
425
426
427
428
# File 'lib/a2a/server/task_manager.rb', line 422

def default_config
  {
    max_history_length: 100,
    cache_size: 1000,
    cache_ttl: 300 # 5 minutes
  }
end

#emit_artifact_update_event(task, artifact, append) ⇒ Object (private)

Emit a task artifact update event

Parameters:



386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
# File 'lib/a2a/server/task_manager.rb', line 386

def emit_artifact_update_event(task, artifact, append)
  event = A2A::Types::TaskArtifactUpdateEvent.new(
    task_id: task.id,
    context_id: task.context_id,
    artifact: artifact,
    append: append,
    metadata: {
      timestamp: Time.now.utc.iso8601,
      event_id: SecureRandom.uuid
    }
  )

  emit_event("task_artifact_update", event)

  # Send push notifications if manager is available
  @push_notification_manager&.notify_task_artifact_update(event)
end

#emit_event(event_type, event_data) ⇒ Object (private)

Emit an event to all registered handlers

Parameters:

  • event_type (String)

    The event type

  • event_data (Object)

    The event data



409
410
411
412
413
414
415
416
# File 'lib/a2a/server/task_manager.rb', line 409

def emit_event(event_type, event_data)
  @event_handlers.each do |handler|
    handler.call(event_type, event_data)
  rescue StandardError => e
    # Log error but don't fail the operation
    warn "Error in event handler: #{e.message}"
  end
end

#emit_status_update_event(task) ⇒ Object (private)

Emit a task status update event

Parameters:



363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/a2a/server/task_manager.rb', line 363

def emit_status_update_event(task)
  event = A2A::Types::TaskStatusUpdateEvent.new(
    task_id: task.id,
    context_id: task.context_id,
    status: task.status,
    metadata: {
      timestamp: Time.now.utc.iso8601,
      event_id: SecureRandom.uuid
    }
  )

  emit_event("task_status_update", event)

  # Send push notifications if manager is available
  @push_notification_manager&.notify_task_status_update(event)
end

#generate_context_idString (private)

Generate a unique context ID

Returns:

  • (String)

    A unique context ID



303
304
305
# File 'lib/a2a/server/task_manager.rb', line 303

def generate_context_id
  SecureRandom.uuid
end

#generate_task_idString (private)

Generate a unique task ID

Returns:

  • (String)

    A unique task ID



295
296
297
# File 'lib/a2a/server/task_manager.rb', line 295

def generate_task_id
  SecureRandom.uuid
end

#get_from_cache(task_id) ⇒ A2A::Types::Task? (private)

Get task from cache

Parameters:

  • task_id (String)

    The task ID

Returns:



435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# File 'lib/a2a/server/task_manager.rb', line 435

def get_from_cache(task_id)
  @cache_mutex.synchronize do
    entry = @task_cache[task_id]
    return nil unless entry

    # Check TTL
    cache_ttl = @config[:cache_ttl] || 300
    if Time.now - entry[:timestamp] > cache_ttl
      @task_cache.delete(task_id)
      return nil
    end

    entry[:task]
  end
end

#get_task(task_id, history_length: nil) ⇒ A2A::Types::Task

Get a task by ID with caching for performance

Parameters:

  • task_id (String)

    The task ID

  • history_length (Integer, nil) (defaults to: nil)

    Maximum number of history messages to include

Returns:

Raises:



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/a2a/server/task_manager.rb', line 95

def get_task(task_id, history_length: nil)
  start_time = Time.now

  # Check cache first for frequently accessed tasks
  cached_task = get_from_cache(task_id)
  if cached_task && (!history_length || !cached_task.history || cached_task.history.length <= history_length)
    record_cache_hit
    return cached_task
  end

  record_cache_miss
  task = @storage.get_task(task_id)
  raise A2A::Errors::TaskNotFound, "Task #{task_id} not found" unless task

  # Limit history if requested
  if history_length && task.history && task.history.length > history_length
    limited_history = task.history.last(history_length)
    # Create a new task instance with limited history
    task = A2A::Types::Task.new(
      id: task.id,
      context_id: task.context_id,
      status: task.status,
      artifacts: task.artifacts,
      history: limited_history,
      metadata: task.
    )
  end

  # Cache the task for future access
  add_to_cache(task_id, task)

  record_processing_time(Time.now - start_time)
  task
end

#list_tasks_by_context(*args) ⇒ Array<A2A::Types::Task>

List tasks by context ID

Parameters:

  • context_id (String)

    The context ID

Returns:



269
270
271
# File 'lib/a2a/server/task_manager.rb', line 269

def list_tasks_by_context(*args)
  @storage.list_tasks_by_context(*args)
end

#performance_metricsHash (private)

Get performance metrics

Returns:

  • (Hash)

    Performance metrics



514
515
516
# File 'lib/a2a/server/task_manager.rb', line 514

def performance_metrics
  @metrics_mutex.synchronize { @performance_metrics.dup }
end

#record_cache_hitObject (private)

Record cache hit



482
483
484
# File 'lib/a2a/server/task_manager.rb', line 482

def record_cache_hit
  @metrics_mutex.synchronize { @performance_metrics[:cache_hits] += 1 }
end

#record_cache_missObject (private)

Record cache miss



489
490
491
# File 'lib/a2a/server/task_manager.rb', line 489

def record_cache_miss
  @metrics_mutex.synchronize { @performance_metrics[:cache_misses] += 1 }
end

#record_processing_time(duration) ⇒ Object (private)

Record processing time

Parameters:

  • duration (Float)

    Processing time in seconds



497
498
499
500
501
502
503
504
505
506
507
508
# File 'lib/a2a/server/task_manager.rb', line 497

def record_processing_time(duration)
  @metrics_mutex.synchronize do
    current_avg = @performance_metrics[:avg_processing_time]
    total_ops = @performance_metrics[:tasks_created] + @performance_metrics[:tasks_updated]

    @performance_metrics[:avg_processing_time] = if total_ops.positive?
                                                   ((current_avg * (total_ops - 1)) + duration) / total_ops
                                                 else
                                                   duration
                                                 end
  end
end

#remove_event_handler(handler) ⇒ Object

Remove an event handler

Parameters:

  • handler (Proc)

    The handler to remove



285
286
287
# File 'lib/a2a/server/task_manager.rb', line 285

def remove_event_handler(handler)
  @event_handlers.delete(handler)
end

#reset_performance_metrics!Object (private)

Reset performance metrics



521
522
523
524
525
526
527
528
529
530
531
# File 'lib/a2a/server/task_manager.rb', line 521

def reset_performance_metrics!
  @metrics_mutex.synchronize do
    @performance_metrics = {
      tasks_created: 0,
      tasks_updated: 0,
      cache_hits: 0,
      cache_misses: 0,
      avg_processing_time: 0.0
    }
  end
end

#update_task_status(task_id, status, message: nil) ⇒ A2A::Types::Task

Update task status with performance tracking

Parameters:

  • task_id (String)

    The task ID

  • status (A2A::Types::TaskStatus, Hash)

    New status

  • message (String, nil) (defaults to: nil)

    Optional status message

Returns:

Raises:



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/a2a/server/task_manager.rb', line 138

def update_task_status(task_id, status, message: nil)
  A2A::Utils::Performance.profile("task_status_update") do
    task = get_task(task_id)

    # Validate state transition
    new_state = status.is_a?(A2A::Types::TaskStatus) ? status.state : status[:state] || status["state"]
    validate_state_transition(task.status.state, new_state)

    # Record task update metric
    @metrics_mutex.synchronize { @performance_metrics[:tasks_updated] += 1 }

    # Create new status
    new_status = if status.is_a?(A2A::Types::TaskStatus)
                   status
                 else
                   status_hash = status.dup
                   status_hash[:message] = message if message
                   status_hash[:updated_at] = Time.now.utc.iso8601
                   A2A::Types::TaskStatus.new(**status_hash)
                 end

    # Update task
    task.update_status(new_status)
    @storage.save_task(task)

    # Update cache
    add_to_cache(task_id, task)

    # Emit status update event
    emit_status_update_event(task)

    task
  end
end

#validate_state_transition(current_state, new_state) ⇒ Object (private)

Validate state transition

Parameters:

  • current_state (String)

    Current task state

  • new_state (String)

    Proposed new state

Raises:

  • (ArgumentError)

    If transition is invalid



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/a2a/server/task_manager.rb', line 313

def validate_state_transition(current_state, new_state)
  # Define valid state transitions
  valid_transitions = {
    A2A::Types:: => [
      A2A::Types::TASK_STATE_WORKING,
      A2A::Types::TASK_STATE_CANCELED,
      A2A::Types::TASK_STATE_REJECTED,
      A2A::Types::TASK_STATE_AUTH_REQUIRED
    ],
    A2A::Types::TASK_STATE_WORKING => [
      A2A::Types::TASK_STATE_INPUT_REQUIRED,
      A2A::Types::TASK_STATE_COMPLETED,
      A2A::Types::TASK_STATE_CANCELED,
      A2A::Types::TASK_STATE_FAILED,
      A2A::Types::TASK_STATE_AUTH_REQUIRED
    ],
    A2A::Types::TASK_STATE_INPUT_REQUIRED => [
      A2A::Types::TASK_STATE_WORKING,
      A2A::Types::TASK_STATE_COMPLETED,
      A2A::Types::TASK_STATE_CANCELED,
      A2A::Types::TASK_STATE_FAILED
    ],
    A2A::Types::TASK_STATE_AUTH_REQUIRED => [
      A2A::Types::TASK_STATE_WORKING,
      A2A::Types::TASK_STATE_CANCELED,
      A2A::Types::TASK_STATE_REJECTED
    ]
  }

  # Terminal states cannot transition
  terminal_states = [
    A2A::Types::TASK_STATE_COMPLETED,
    A2A::Types::TASK_STATE_CANCELED,
    A2A::Types::TASK_STATE_FAILED,
    A2A::Types::TASK_STATE_REJECTED,
    A2A::Types::TASK_STATE_UNKNOWN
  ]

  raise ArgumentError, "Cannot transition from terminal state '#{current_state}'" if terminal_states.include?(current_state)

  allowed_states = valid_transitions[current_state] || []
  return if allowed_states.include?(new_state)

  raise ArgumentError, "Invalid state transition from '#{current_state}' to '#{new_state}'"
end