Class: A2A::Server::DefaultRequestHandler

Inherits:
RequestHandler show all
Defined in:
lib/a2a/server/default_request_handler.rb

Overview

Default implementation of the RequestHandler interface

This class provides a complete implementation of the A2A request handler that uses an AgentExecutor for processing requests and manages tasks through a TaskManager. It mirrors the Python DefaultRequestHandler.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(agent_executor, task_store: nil, push_notification_manager: nil) ⇒ DefaultRequestHandler

Initialize the default request handler

Parameters:

  • agent_executor (AgentExecution::AgentExecutor)

    The agent executor for processing requests

  • task_store (Object, nil) (defaults to: nil)

    Optional task store for persistence

  • push_notification_manager (PushNotificationManager, nil) (defaults to: nil)

    Optional push notification manager



29
30
31
32
33
34
35
36
37
# File 'lib/a2a/server/default_request_handler.rb', line 29

def initialize(agent_executor, task_store: nil, push_notification_manager: nil)
  @agent_executor = agent_executor
  @task_store = task_store
  @task_manager = TaskManager.new(
    storage: task_store,
    push_notification_manager: push_notification_manager
  )
  @push_notification_manager = push_notification_manager || PushNotificationManager.new
end

Instance Attribute Details

#agent_executorObject (readonly)

Returns the value of attribute agent_executor.



21
22
23
# File 'lib/a2a/server/default_request_handler.rb', line 21

def agent_executor
  @agent_executor
end

#push_notification_managerObject (readonly)

Returns the value of attribute push_notification_manager.



21
22
23
# File 'lib/a2a/server/default_request_handler.rb', line 21

def push_notification_manager
  @push_notification_manager
end

#task_managerObject (readonly)

Returns the value of attribute task_manager.



21
22
23
# File 'lib/a2a/server/default_request_handler.rb', line 21

def task_manager
  @task_manager
end

#task_storeObject (readonly)

Returns the value of attribute task_store.



21
22
23
# File 'lib/a2a/server/default_request_handler.rb', line 21

def task_store
  @task_store
end

Instance Method Details

#on_cancel_task(params, context = nil) ⇒ A2A::Types::Task?

Handle the ‘tasks/cancel’ method

Parameters:

  • params (Hash)

    Parameters with task ID

  • context (A2A::Server::Context, nil) (defaults to: nil)

    Server context

Returns:

Raises:



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/a2a/server/default_request_handler.rb', line 62

def on_cancel_task(params, context = nil)
  task_id = params["id"] || params[:id]
  params["reason"] || params[:reason]

  raise A2A::Errors::InvalidParams, "Task ID is required" unless task_id

  # Create request context for cancellation
  request_context = AgentExecution::RequestContextBuilder.from_task_operation(
    params, context, operation: "cancel"
  )

  # Create event queue for the cancellation
  event_queue = Events::InMemoryEventQueue.new

  # Set up event processing
  setup_event_processing(event_queue, task_id, request_context.context_id)

  begin
    # Execute cancellation through agent executor
    @agent_executor.cancel(request_context, event_queue)

    # Wait briefly for the cancellation to be processed
    sleep 0.1

    # Return the updated task
    @task_manager.get_task(task_id)
  rescue A2A::Errors::TaskNotFound
    nil
  ensure
    event_queue.close
  end
end

#on_delete_task_push_notification_config(params, _context = nil) ⇒ Object

Delete push notification config

Parameters:

  • params (Hash)

    Parameters with task ID and config ID

  • context (A2A::Server::Context, nil)

    Server context

Raises:



280
281
282
283
284
285
286
287
288
# File 'lib/a2a/server/default_request_handler.rb', line 280

def on_delete_task_push_notification_config(params, _context = nil)
  task_id = params["taskId"] || params[:task_id]
  config_id = params["configId"] || params[:config_id]

  raise A2A::Errors::InvalidParams, "Task ID and config ID are required" unless task_id && config_id

  deleted = @push_notification_manager.delete_push_notification_config(task_id, config_id)
  raise A2A::Errors::NotFound, "Push notification config not found" unless deleted
end

#on_get_task(params, _context = nil) ⇒ A2A::Types::Task?

Handle the ‘tasks/get’ method

Parameters:

  • params (Hash)

    Parameters with task ID and optional history length

  • context (A2A::Server::Context, nil)

    Server context

Returns:



45
46
47
48
49
50
51
52
53
54
# File 'lib/a2a/server/default_request_handler.rb', line 45

def on_get_task(params, _context = nil)
  task_id = params["id"] || params[:id]
  history_length = params["historyLength"] || params[:history_length]

  raise A2A::Errors::InvalidParams, "Task ID is required" unless task_id

  @task_manager.get_task(task_id, history_length: history_length)
rescue A2A::Errors::TaskNotFound
  nil
end

#on_get_task_push_notification_config(params, _context = nil) ⇒ A2A::Types::TaskPushNotificationConfig

Get push notification config

Parameters:

  • params (Hash)

    Parameters with task ID and optional config ID

  • context (A2A::Server::Context, nil)

    Server context

Returns:

Raises:



250
251
252
253
254
255
256
257
258
259
260
# File 'lib/a2a/server/default_request_handler.rb', line 250

def on_get_task_push_notification_config(params, _context = nil)
  task_id = params["taskId"] || params[:task_id]
  config_id = params["configId"] || params[:config_id]

  raise A2A::Errors::InvalidParams, "Task ID is required" unless task_id

  config = @push_notification_manager.get_push_notification_config(task_id, config_id: config_id)
  raise A2A::Errors::NotFound, "Push notification config not found" unless config

  config
end

#on_list_task_push_notification_config(params, _context = nil) ⇒ Array<A2A::Types::TaskPushNotificationConfig>

List push notification configs

Parameters:

Returns:

Raises:



268
269
270
271
272
273
# File 'lib/a2a/server/default_request_handler.rb', line 268

def on_list_task_push_notification_config(params, _context = nil)
  task_id = params["taskId"] || params[:task_id]
  raise A2A::Errors::InvalidParams, "Task ID is required" unless task_id

  @push_notification_manager.list_push_notification_configs(task_id)
end

#on_message_send(params, context = nil) ⇒ A2A::Types::Task, A2A::Types::Message

Handle the ‘message/send’ method (non-streaming)

Parameters:

  • params (Hash)

    Parameters with message and configuration

  • context (A2A::Server::Context, nil) (defaults to: nil)

    Server context

Returns:



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/a2a/server/default_request_handler.rb', line 101

def on_message_send(params, context = nil)
  # Build request context
  request_context = AgentExecution::RequestContextBuilder.from_message_send(params, context)

  # Create event queue for execution
  event_queue = Events::InMemoryEventQueue.new
  result = nil

  # Set up event processing to capture the final result
  setup_event_processing(event_queue, request_context.task_id, request_context.context_id) do |event|
    case event.type
    when "task"
      result = event.data if event.data.status.state == A2A::Types::TASK_STATE_COMPLETED
    when "message"
      result = event.data
    end
  end

  begin
    # Execute through agent executor
    @agent_executor.execute(request_context, event_queue)

    # Wait for completion (with timeout)
    timeout = 30 # 30 seconds timeout
    start_time = Time.now

    sleep 0.1 while result.nil? && (Time.now - start_time) < timeout

    raise A2A::Errors::Timeout, "Request timed out" unless result

    result
  ensure
    event_queue.close
  end
end

#on_message_send_stream(params, context = nil) ⇒ Enumerator

Handle the ‘message/stream’ method (streaming)

Parameters:

  • params (Hash)

    Parameters with message and configuration

  • context (A2A::Server::Context, nil) (defaults to: nil)

    Server context

Returns:

  • (Enumerator)

    Enumerator yielding events



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/a2a/server/default_request_handler.rb', line 143

def on_message_send_stream(params, context = nil)
  # Build request context
  request_context = AgentExecution::RequestContextBuilder.from_streaming_message(params, context)

  # Create event queue for execution
  event_queue = Events::InMemoryEventQueue.new

  Enumerator.new do |yielder|
    # Set up event processing to yield events
    setup_event_processing(event_queue, request_context.task_id, request_context.context_id) do |event|
      yielder << event.data
    end

    begin
      # Execute through agent executor
      @agent_executor.execute(request_context, event_queue)

      # Keep the stream alive until the task completes
      loop do
        sleep 0.1

        # Check if task is complete
        next unless request_context.task_id

        begin
          task = @task_manager.get_task(request_context.task_id)
          break if task&.completed?
        rescue A2A::Errors::TaskNotFound
          break
        end
      end
    ensure
      event_queue.close
    end
  end
end

#on_resubscribe_to_task(params, _context = nil) ⇒ Enumerator

Handle the ‘tasks/resubscribe’ method

Parameters:

Returns:

  • (Enumerator)

    Enumerator yielding events

Raises:



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/a2a/server/default_request_handler.rb', line 186

def on_resubscribe_to_task(params, _context = nil)
  task_id = params["id"] || params[:id]
  raise A2A::Errors::InvalidParams, "Task ID is required" unless task_id

  # Verify task exists
  task = @task_manager.get_task(task_id)
  raise A2A::Errors::TaskNotFound, "Task #{task_id} not found" unless task

  # Create event queue for resubscription
  event_queue = Events::InMemoryEventQueue.new

  Enumerator.new do |yielder|
    # Send current task state immediately
    yielder << task

    # Set up event processing for future updates
    setup_event_processing(event_queue, task_id, task.context_id) do |event|
      yielder << event.data
    end

    begin
      # Keep the stream alive
      loop do
        sleep 1

        # Check if task is still active
        begin
          current_task = @task_manager.get_task(task_id)
          break if current_task&.completed?
        rescue A2A::Errors::TaskNotFound
          break
        end
      end
    ensure
      event_queue.close
    end
  end
end

#on_set_task_push_notification_config(params, _context = nil) ⇒ A2A::Types::TaskPushNotificationConfig

Handle push notification config operations

Parameters:

  • params (Hash)

    Parameters with task ID and config

  • context (A2A::Server::Context, nil)

    Server context

Returns:

Raises:



231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/a2a/server/default_request_handler.rb', line 231

def on_set_task_push_notification_config(params, _context = nil)
  task_id = params["taskId"] || params[:task_id]
  config_data = params["config"] || params[:config]

  raise A2A::Errors::InvalidParams, "Task ID is required" unless task_id
  raise A2A::Errors::InvalidParams, "Config is required" unless config_data

  # Verify task exists
  @task_manager.get_task(task_id)

  @push_notification_manager.set_push_notification_config(task_id, config_data)
end

#setup_event_processing(event_queue, task_id, context_id) {|event| ... } ⇒ Object (private)

Set up event processing for a task

Parameters:

  • event_queue (Events::EventQueue)

    The event queue

  • task_id (String, nil)

    The task ID to filter events for

  • context_id (String, nil)

    The context ID to filter events for

Yields:

  • (event)

    Block to process each event



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/a2a/server/default_request_handler.rb', line 299

def setup_event_processing(event_queue, task_id, context_id, &block)
  # Create event consumer
  consumer = Events::EventConsumer.new(event_queue)

  # Register handlers for different event types
  consumer.register_handler("task") do |event|
    # Update task manager with task events
    @task_manager.storage.save_task(event.data) if @task_manager.storage.respond_to?(:save_task)
    block&.call(event)
  end

  consumer.register_handler("task_status_update") do |event|
    # Update task status
    @task_manager.update_task_status(
      event.data.task_id,
      event.data.status
    )
    block&.call(event)
  end

  consumer.register_handler("task_artifact_update") do |event|
    # Add artifact to task
    @task_manager.add_artifact(
      event.data.task_id,
      event.data.artifact,
      append: event.data.append
    )
    block&.call(event)
  end

  consumer.register_handler("message") do |event|
    # Add message to task history if we have a task
    @task_manager.add_message(task_id, event.data) if task_id
    block&.call(event)
  end

  # Start consuming events
  filter = if task_id || context_id
             lambda { |event|
               (task_id.nil? || event.task_id == task_id) &&
                 (context_id.nil? || event.context_id == context_id)
             }
           else
             nil
           end

  consumer.start(filter)
  consumer
end