Class: A2A::Client::HttpClient

Inherits:
Base
  • Object
show all
Includes:
ApiMethods, JsonRpcHandler, PerformanceTracker
Defined in:
lib/a2a/client/http_client.rb

Instance Attribute Summary collapse

Attributes inherited from Base

#config, #consumers, #middleware

Instance Method Summary collapse

Methods included from ApiMethods

#cancel_task, #get_task, #send_message

Methods included from JsonRpcHandler

#initialize_json_rpc_handling

Methods included from PerformanceTracker

#initialize_performance_tracking

Methods inherited from Base

#add_consumer, #add_middleware, #agent_supports_transport?, #cancel_task, #ensure_agent_card, #ensure_message, #ensure_task, #execute_with_middleware, #get_endpoint_url, #get_task, #negotiate_transport, #polling?, #process_event, #remove_consumer, #remove_middleware, #send_message, #streaming?, #supported_transports

Constructor Details

#initialize(endpoint_url, config: nil, middleware: [], consumers: []) ⇒ HttpClient

Initialize a new HTTP client

Parameters:

  • The base URL for the A2A agent

  • (defaults to: nil)

    Client configuration

  • (defaults to: [])

    List of middleware interceptors

  • (defaults to: [])

    List of event consumers



41
42
43
44
45
46
47
48
# File 'lib/a2a/client/http_client.rb', line 41

def initialize(endpoint_url, config: nil, middleware: [], consumers: [])
  super(config: config, middleware: middleware, consumers: consumers)
  @endpoint_url = endpoint_url.chomp("/")
  @connection = build_connection
  @connection_pool = nil
  initialize_performance_tracking
  initialize_json_rpc_handling
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



32
33
34
# File 'lib/a2a/client/http_client.rb', line 32

def connection
  @connection
end

#endpoint_urlObject (readonly)

Returns the value of attribute endpoint_url.



32
33
34
# File 'lib/a2a/client/http_client.rb', line 32

def endpoint_url
  @endpoint_url
end

Instance Method Details

#build_connectionFaraday::Connection (private)

Build the Faraday connection with performance optimizations

Returns:

  • The configured connection



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# File 'lib/a2a/client/http_client.rb', line 273

def build_connection
  Faraday.new(@endpoint_url) do |conn|
    # Request middleware
    conn.request :json

    # Response middleware
    conn.response :json, content_type: /\bjson$/

    # Use connection pooling adapter if available
    conn.adapter HTTP_ADAPTER

    # Set timeouts
    conn.options.timeout = @config.timeout
    conn.options.read_timeout = @config.timeout
    conn.options.write_timeout = @config.timeout

    # Performance optimizations (only set if supported)
    conn.options.pool_size = @config.pool_size || 5 if conn.options.respond_to?(:pool_size=)

    # Enable compression if supported
    conn.headers["Accept-Encoding"] = "gzip, deflate"

    # Set keep-alive headers
    conn.headers["Connection"] = "keep-alive"
    conn.headers["Keep-Alive"] = "timeout=30, max=100"
  end
end

#build_json_rpc_request(method, params = {}) ⇒ A2A::Protocol::Request (private)

Build a JSON-RPC request

Parameters:

  • The method name

  • (defaults to: {})

    The method parameters

Returns:

  • The JSON-RPC request



468
469
470
471
472
473
474
475
# File 'lib/a2a/client/http_client.rb', line 468

def build_json_rpc_request(method, params = {})
  A2A::Protocol::Request.new(
    jsonrpc: A2A::Protocol::JsonRpc::JSONRPC_VERSION,
    method: method,
    params: params,
    id: next_request_id
  )
end

#delete_task_callback(task_id, push_notification_config_id, context: nil) ⇒ void

This method returns an undefined value.

Delete a callback configuration for a task

Parameters:

  • The task ID

  • The push notification config ID

  • (defaults to: nil)

    Optional context information



165
166
167
168
169
170
171
172
173
174
175
# File 'lib/a2a/client/http_client.rb', line 165

def delete_task_callback(task_id, push_notification_config_id, context: nil)
  params = {
    taskId: task_id,
    pushNotificationConfigId: push_notification_config_id
  }

  request = build_json_rpc_request("tasks/pushNotificationConfig/delete", params)
  execute_with_middleware(request, context || {}) do |req, _ctx|
    send_json_rpc_request(req)
  end
end

#delete_task_push_notification_config(task_id, config_id, context: nil) ⇒ Boolean

Delete push notification configuration for a task

Parameters:

  • The task ID

  • The push notification config ID

  • (defaults to: nil)

    Optional context information

Returns:

  • True if deletion was successful



253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/a2a/client/http_client.rb', line 253

def delete_task_push_notification_config(task_id, config_id, context: nil)
  params = {
    taskId: task_id,
    pushNotificationConfigId: config_id
  }

  request = build_json_rpc_request("tasks/pushNotificationConfig/delete", params)
  execute_with_middleware(request, context || {}) do |req, _ctx|
    send_json_rpc_request(req)
  end

  true
end

#get_card(context: nil, authenticated: false) ⇒ AgentCard

Get the agent card

Parameters:

  • (defaults to: nil)

    Optional context information

  • (defaults to: false)

    Whether to get authenticated extended card

Returns:

  • The agent card



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/a2a/client/http_client.rb', line 56

def get_card(context: nil, authenticated: false)
  if authenticated
    request = build_json_rpc_request("agent/getAuthenticatedExtendedCard", {})
    response = execute_with_middleware(request, context || {}) do |req, _ctx|
      send_json_rpc_request(req)
    end
    ensure_agent_card(response["result"])
  else
    # Use HTTP GET for basic agent card
    response = execute_with_middleware({}, context || {}) do |_req, _ctx|
      @connection.get("/agent-card") do |request|
        request.headers.merge!(@config.all_headers)
      end
    end

    if response.success?
      ensure_agent_card(JSON.parse(response.body))
    else
      raise A2A::Errors::HTTPError.new(
        "Failed to get agent card: #{response.status}",
        status_code: response.status,
        response_body: response.body
      )
    end
  end
end

#get_task_callback(task_id, push_notification_config_id, context: nil) ⇒ TaskPushNotificationConfig

Get the callback configuration for a task

Parameters:

  • The task ID

  • The push notification config ID

  • (defaults to: nil)

    Optional context information

Returns:

  • The callback configuration



129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/a2a/client/http_client.rb', line 129

def get_task_callback(task_id, push_notification_config_id, context: nil)
  params = {
    taskId: task_id,
    pushNotificationConfigId: push_notification_config_id
  }

  request = build_json_rpc_request("tasks/pushNotificationConfig/get", params)
  response = execute_with_middleware(request, context || {}) do |req, _ctx|
    send_json_rpc_request(req)
  end

  A2A::Types::TaskPushNotificationConfig.from_h(response["result"])
end

#get_task_push_notification_config(task_id, config_id, context: nil) ⇒ Hash

Get push notification configuration for a task

Parameters:

  • The task ID

  • The push notification config ID

  • (defaults to: nil)

    Optional context information

Returns:

  • The push notification configuration



217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/a2a/client/http_client.rb', line 217

def get_task_push_notification_config(task_id, config_id, context: nil)
  params = {
    taskId: task_id,
    pushNotificationConfigId: config_id
  }

  request = build_json_rpc_request("tasks/pushNotificationConfig/get", params)
  response = execute_with_middleware(request, context || {}) do |req, _ctx|
    send_json_rpc_request(req)
  end

  response["result"]
end

#handle_http_response(response) ⇒ Hash (private)

Handle HTTP response and extract JSON-RPC result

Parameters:

  • The HTTP response

Returns:

  • The JSON-RPC response



401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
# File 'lib/a2a/client/http_client.rb', line 401

def handle_http_response(response)
  unless response.success?
    raise A2A::Errors::HTTPError.new(
      "HTTP request failed: #{response.status}",
      status_code: response.status,
      response_body: response.body
    )
  end

  begin
    json_response = response.body.is_a?(Hash) ? response.body : JSON.parse(response.body)
  rescue JSON::ParserError => e
    raise A2A::Errors::JSONError, "Invalid JSON response: #{e.message}"
  end

  # Check for JSON-RPC error
  if json_response["error"]
    error = json_response["error"]
    raise A2A::Errors::ErrorUtils.from_json_rpc_code(
      error["code"],
      error["message"],
      data: error["data"]
    )
  end

  json_response
end

#list_task_callbacks(task_id, context: nil) ⇒ Array<TaskPushNotificationConfig>

List all callback configurations for a task

Parameters:

  • The task ID

  • (defaults to: nil)

    Optional context information

Returns:

  • List of callback configurations



149
150
151
152
153
154
155
156
# File 'lib/a2a/client/http_client.rb', line 149

def list_task_callbacks(task_id, context: nil)
  request = build_json_rpc_request("tasks/pushNotificationConfig/list", { taskId: task_id })
  response = execute_with_middleware(request, context || {}) do |req, _ctx|
    send_json_rpc_request(req)
  end

  response["result"].map { |config| A2A::Types::TaskPushNotificationConfig.from_h(config) }
end

#list_task_push_notification_configs(task_id, context: nil) ⇒ Array<Hash>

List all push notification configurations for a task

Parameters:

  • The task ID

  • (defaults to: nil)

    Optional context information

Returns:

  • List of push notification configurations



237
238
239
240
241
242
243
244
# File 'lib/a2a/client/http_client.rb', line 237

def list_task_push_notification_configs(task_id, context: nil)
  request = build_json_rpc_request("tasks/pushNotificationConfig/list", { taskId: task_id })
  response = execute_with_middleware(request, context || {}) do |req, _ctx|
    send_json_rpc_request(req)
  end

  response["result"] || []
end

#next_request_idInteger (private)

Generate next request ID

Returns:

  • The next request ID



481
482
483
484
485
# File 'lib/a2a/client/http_client.rb', line 481

def next_request_id
  @request_id_mutex.synchronize do
    @request_id_counter += 1
  end
end

#parse_sse_chunk(chunk) ⇒ Array<Hash> (private)

Parse Server-Sent Events chunk

Parameters:

  • The SSE chunk

Returns:

  • Parsed events



434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
# File 'lib/a2a/client/http_client.rb', line 434

def parse_sse_chunk(chunk)
  events = []
  current_event = {}

  chunk.split("\n").each do |line|
    line = line.strip
    next if line.empty?

    if line.start_with?("data: ")
      current_event[:data] = line[6..]
    elsif line.start_with?("event: ")
      current_event[:event] = line[7..]
    elsif line.start_with?("id: ")
      current_event[:id] = line[4..]
    elsif line.start_with?("retry: ")
      current_event[:retry] = line[7..].to_i
    elsif line == ""
      # Empty line indicates end of event
      events << current_event.dup if current_event[:data]
      current_event.clear
    end
  end

  # Handle case where chunk doesn't end with empty line
  events << current_event if current_event[:data]
  events
end

#performance_statsHash (private)

Get performance statistics

Returns:

  • Performance statistics



491
492
493
# File 'lib/a2a/client/http_client.rb', line 491

def performance_stats
  @stats_mutex.synchronize { @performance_stats.dup }
end

#record_request_performance(duration) ⇒ Object (private)

Record request performance metrics

Parameters:

  • Request duration in seconds



514
515
516
517
518
519
520
521
# File 'lib/a2a/client/http_client.rb', line 514

def record_request_performance(duration)
  @stats_mutex.synchronize do
    @performance_stats[:requests_count] += 1
    @performance_stats[:total_time] += duration
    @performance_stats[:avg_response_time] =
      @performance_stats[:total_time] / @performance_stats[:requests_count]
  end
end

#reset_performance_stats!Object (private)

Reset performance statistics



498
499
500
501
502
503
504
505
506
507
508
# File 'lib/a2a/client/http_client.rb', line 498

def reset_performance_stats!
  @stats_mutex.synchronize do
    @performance_stats = {
      requests_count: 0,
      total_time: 0.0,
      avg_response_time: 0.0,
      cache_hits: 0,
      cache_misses: 0
    }
  end
end

#resubscribe(task_id, context: nil) ⇒ Enumerator

Resubscribe to a task for streaming updates

Parameters:

  • The task ID to resubscribe to

  • (defaults to: nil)

    Optional context information

Returns:

  • Stream of task updates



89
90
91
92
93
94
95
# File 'lib/a2a/client/http_client.rb', line 89

def resubscribe(task_id, context: nil)
  request = build_json_rpc_request("tasks/resubscribe", { id: task_id })

  execute_with_middleware(request, context || {}) do |req, _ctx|
    send_streaming_request(req)
  end
end

#send_json_rpc_request(request) ⇒ Hash (private)

Send a JSON-RPC request and get response

Parameters:

  • The JSON-RPC request

Returns:

  • The JSON-RPC response



335
336
337
338
339
340
341
342
343
# File 'lib/a2a/client/http_client.rb', line 335

def send_json_rpc_request(request)
  response = @connection.post do |req|
    req.headers.merge!(@config.all_headers)
    req.headers["Content-Type"] = "application/json"
    req.body = request.to_json
  end

  handle_http_response(response)
end

#send_streaming_message(message, context) ⇒ Enumerator (private)

Send a streaming message

Parameters:

  • The message to send

  • The request context

Returns:

  • Stream of response messages



322
323
324
325
326
327
328
# File 'lib/a2a/client/http_client.rb', line 322

def send_streaming_message(message, context)
  request = build_json_rpc_request("message/stream", message.to_h)

  execute_with_middleware(request, context) do |req, _ctx|
    send_streaming_request(req)
  end
end

#send_streaming_request(request) ⇒ Enumerator (private)

Send a streaming request using Server-Sent Events

Parameters:

  • The JSON-RPC request

Returns:

  • Stream of events



350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/a2a/client/http_client.rb', line 350

def send_streaming_request(request)
  Enumerator.new do |yielder|
    response = @connection.post do |req|
      req.headers.merge!(@config.all_headers)
      req.headers["Content-Type"] = "application/json"
      req.headers["Accept"] = "text/event-stream"
      req.body = request.to_json

      # Handle streaming response
      req.options.on_data = proc do |chunk, _size|
        events = parse_sse_chunk(chunk)
        events.each do |event|
          case event[:event]
          when "message"
            yielder << ensure_message(JSON.parse(event[:data]))
          when "task_status_update"
            event_data = A2A::Types::TaskStatusUpdateEvent.from_h(JSON.parse(event[:data]))
            process_event(event_data)
            yielder << event_data
          when "task_artifact_update"
            event_data = A2A::Types::TaskArtifactUpdateEvent.from_h(JSON.parse(event[:data]))
            process_event(event_data)
            yielder << event_data
          when "error"
            error_data = JSON.parse(event[:data])
            error = A2A::Errors::ErrorUtils.from_json_rpc_code(
              error_data["code"],
              error_data["message"],
              data: error_data["data"]
            )
            raise error
          end
        end
      end
    end

    unless response.success?
      raise A2A::Errors::HTTPError.new(
        "Streaming request failed: #{response.status}",
        status_code: response.status,
        response_body: response.body
      )
    end
  end
end

#send_sync_message(message, context) ⇒ Message (private)

Send a synchronous message

Parameters:

  • The message to send

  • The request context

Returns:

  • The response message



307
308
309
310
311
312
313
314
# File 'lib/a2a/client/http_client.rb', line 307

def send_sync_message(message, context)
  request = build_json_rpc_request("message/send", message.to_h)
  response = execute_with_middleware(request, context) do |req, _ctx|
    send_json_rpc_request(req)
  end

  ensure_message(response["result"])
end

#set_task_callback(task_id, push_notification_config, context: nil) ⇒ void

This method returns an undefined value.

Set a callback for task updates

Parameters:

  • The task ID

  • The push notification configuration

  • (defaults to: nil)

    Optional context information



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/a2a/client/http_client.rb', line 104

def set_task_callback(task_id, push_notification_config, context: nil)
  config = if push_notification_config.is_a?(A2A::Types::PushNotificationConfig)
             push_notification_config
           else
             A2A::Types::PushNotificationConfig.from_h(push_notification_config)
           end

  params = {
    taskId: task_id,
    pushNotificationConfig: config.to_h
  }

  request = build_json_rpc_request("tasks/pushNotificationConfig/set", params)
  execute_with_middleware(request, context || {}) do |req, _ctx|
    send_json_rpc_request(req)
  end
end

#set_task_push_notification_config(task_id, config, context: nil) ⇒ Hash

Set push notification configuration for a task

Parameters:

  • The task ID

  • The push notification configuration

  • (defaults to: nil)

    Optional context information

Returns:

  • The created configuration with ID



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/a2a/client/http_client.rb', line 184

def set_task_push_notification_config(task_id, config, context: nil)
  # Validate and normalize config
  normalized_config = if config.is_a?(A2A::Types::PushNotificationConfig)
                        config.to_h
                      elsif config.is_a?(Hash)
                        # Validate required fields
                        raise ArgumentError, "config must include 'url'" unless config[:url] || config["url"]

                        config
                      else
                        raise ArgumentError, "config must be a Hash or PushNotificationConfig"
                      end

  params = {
    taskId: task_id,
    pushNotificationConfig: normalized_config
  }

  request = build_json_rpc_request("tasks/pushNotificationConfig/set", params)
  response = execute_with_middleware(request, context || {}) do |req, _ctx|
    send_json_rpc_request(req)
  end

  response["result"]
end