Module: A2A::Client::JsonRpcHandler

Included in:
HttpClient
Defined in:
lib/a2a/client/json_rpc_handler.rb

Overview

JSON-RPC request handling functionality

Instance Method Summary collapse

Instance Method Details

#build_json_rpc_request(method, params = {}) ⇒ Hash

Build a JSON-RPC request

Parameters:

  • method (String)

    The method name

  • params (Hash) (defaults to: {})

    The parameters

Returns:

  • (Hash)

    The JSON-RPC request



115
116
117
118
119
120
121
122
# File 'lib/a2a/client/json_rpc_handler.rb', line 115

def build_json_rpc_request(method, params = {})
  {
    jsonrpc: "2.0",
    method: method,
    params: params,
    id: next_request_id
  }
end

#handle_http_response(response) ⇒ Hash

Handle HTTP response

Parameters:

  • response (Faraday::Response)

    The HTTP response

Returns:

  • (Hash)

    The parsed JSON-RPC response



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/a2a/client/json_rpc_handler.rb', line 60

def handle_http_response(response)
  unless response.success?
    case response.status
    when 408
      raise A2A::Errors::TimeoutError, "Request timeout"
    when 400..499
      raise A2A::Errors::HTTPError, "HTTP #{response.status}: #{response.body}"
    when 500..599
      raise A2A::Errors::HTTPError, "HTTP #{response.status}: #{response.body}"
    else
      raise A2A::Errors::HTTPError, "HTTP #{response.status}: #{response.body}"
    end
  end

  begin
    parsed_response = JSON.parse(response.body)
  rescue JSON::ParserError => e
    raise A2A::Errors::ParseError, "Invalid JSON response: #{e.message}"
  end

  if parsed_response["error"]
    error_code = parsed_response["error"]["code"]
    error_message = parsed_response["error"]["message"]
    raise A2A::Errors::A2AError.new(error_message, error_code)
  end

  parsed_response
end

#initialize_json_rpc_handlingObject

Initialize JSON-RPC handling



15
16
17
18
# File 'lib/a2a/client/json_rpc_handler.rb', line 15

def initialize_json_rpc_handling
  @request_id_counter = 0
  @request_id_mutex = Mutex.new
end

#next_request_idString

Generate the next request ID

Returns:

  • (String)

    The request ID



128
129
130
131
132
133
# File 'lib/a2a/client/json_rpc_handler.rb', line 128

def next_request_id
  @request_id_mutex.synchronize do
    @request_id_counter += 1
    "#{SecureRandom.hex(8)}-#{@request_id_counter}"
  end
end

#parse_sse_chunk(chunk) ⇒ Hash?

Parse Server-Sent Events chunk

Parameters:

  • chunk (String)

    The SSE chunk

Returns:

  • (Hash, nil)

    The parsed event or nil



94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/a2a/client/json_rpc_handler.rb', line 94

def parse_sse_chunk(chunk)
  return nil if chunk.empty? || chunk.start_with?(":")

  return unless chunk.start_with?("data: ")

  data = chunk[6..] # Remove "data: " prefix
  return nil if data == "[DONE]"

  begin
    JSON.parse(data)
  rescue JSON::ParserError
    nil
  end
end

#send_json_rpc_request(request) ⇒ Hash

Send a JSON-RPC request

Parameters:

  • request (Hash)

    The JSON-RPC request

Returns:

  • (Hash)

    The parsed response



25
26
27
28
29
30
31
32
# File 'lib/a2a/client/json_rpc_handler.rb', line 25

def send_json_rpc_request(request)
  start_time = Time.now
  response = @connection.post("/", request.to_json, "Content-Type" => "application/json")
  duration = Time.now - start_time

  record_request_performance(duration)
  handle_http_response(response)
end

#send_streaming_request(request) ⇒ Enumerator

Send a streaming JSON-RPC request

Parameters:

  • request (Hash)

    The JSON-RPC request

Returns:

  • (Enumerator)

    Stream of parsed responses



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/a2a/client/json_rpc_handler.rb', line 39

def send_streaming_request(request)
  Enumerator.new do |yielder|
    response = @connection.post("/stream", request.to_json) do |req|
      req.headers["Content-Type"] = "application/json"
      req.headers["Accept"] = "text/event-stream"
    end

    raise A2A::Errors::HTTPError, "HTTP #{response.status}: #{response.body}" unless response.success?

    response.body.each_line do |line|
      event = parse_sse_chunk(line.strip)
      yielder << event if event
    end
  end
end