Class: A2A::Transport::Http
- Inherits:
-
Object
- Object
- A2A::Transport::Http
- Defined in:
- lib/a2a/transport/http.rb
Constant Summary collapse
- DEFAULT_TIMEOUT =
Default configuration values
30- DEFAULT_OPEN_TIMEOUT =
10- DEFAULT_READ_TIMEOUT =
30- DEFAULT_WRITE_TIMEOUT =
30- DEFAULT_POOL_SIZE =
5- DEFAULT_POOL_TIMEOUT =
5- DEFAULT_RETRY_COUNT =
3- DEFAULT_RETRY_DELAY =
1.0
Instance Attribute Summary collapse
-
#base_url ⇒ Object
readonly
Returns the value of attribute base_url.
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Instance Method Summary collapse
-
#build_connection ⇒ Faraday::Connection
private
Build Faraday connection with configuration.
-
#close ⇒ Object
Close connection and cleanup resources.
-
#default_config ⇒ Hash
private
Build default configuration.
-
#default_logger ⇒ Logger
private
Get default logger.
-
#delete(path = "", params: {}, headers: {}) ⇒ Faraday::Response
Send DELETE request.
-
#get(path = "", params: {}, headers: {}) ⇒ Faraday::Response
Send GET request.
-
#handle_response(response) ⇒ Faraday::Response
private
Handle HTTP response.
-
#initialize(base_url, config = {}) ⇒ Http
constructor
Initialize HTTP transport.
-
#json_rpc_request(rpc_request, headers: {}) ⇒ Hash
Send JSON-RPC request.
-
#metrics ⇒ Hash
Get connection metrics.
-
#parse_json_response(response) ⇒ Hash
private
Parse JSON response.
-
#post(path = "", body: nil, params: {}, headers: {}) ⇒ Faraday::Response
Send POST request.
-
#prepare_body(body) ⇒ String
private
Prepare request body.
-
#put(path = "", body: nil, params: {}, headers: {}) ⇒ Faraday::Response
Send PUT request.
-
#record_metrics(method, status, duration) ⇒ Object
private
Record request metrics.
-
#request(method, path = "", params: {}, headers: {}, body: nil) ⇒ Faraday::Response
Send HTTP request.
-
#reset_metrics! ⇒ Object
Reset connection metrics.
Constructor Details
#initialize(base_url, config = {}) ⇒ Http
Initialize HTTP transport
60 61 62 63 64 65 |
# File 'lib/a2a/transport/http.rb', line 60 def initialize(base_url, config = {}) @base_url = base_url @config = default_config.merge(config) @connection = build_connection @metrics = Concurrent::Hash.new(0) end |
Instance Attribute Details
#base_url ⇒ Object (readonly)
Returns the value of attribute base_url.
37 38 39 |
# File 'lib/a2a/transport/http.rb', line 37 def base_url @base_url end |
#config ⇒ Object (readonly)
Returns the value of attribute config.
37 38 39 |
# File 'lib/a2a/transport/http.rb', line 37 def config @config end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
37 38 39 |
# File 'lib/a2a/transport/http.rb', line 37 def connection @connection end |
Instance Method Details
#build_connection ⇒ Faraday::Connection (private)
Build Faraday connection with configuration
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 |
# File 'lib/a2a/transport/http.rb', line 273 def build_connection Faraday.new(@base_url) do |conn| # Request/response middleware conn.request :json conn.request :multipart if MULTIPART_AVAILABLE conn.request :url_encoded # NOTE: Retry middleware requires faraday-retry gem # Uncomment when faraday-retry is available: # conn.request :retry, # max: @config[:retry_count], # interval: @config[:retry_delay], # backoff_factor: 2, # retry_statuses: [429, 500, 502, 503, 504], # methods: [:get, :post, :put, :delete] # Logging middleware conn.response :logger, @config[:logger] || default_logger if @config[:logging] # Response middleware conn.response :json, content_type: /\bjson$/ # NOTE: Not using :raise_error to handle errors manually # Adapter (use default net_http adapter) # Note: net_http_persistent requires separate gem for connection pooling conn.adapter Faraday.default_adapter # Configure timeouts conn..timeout = @config[:timeout] conn..open_timeout = @config[:open_timeout] conn..read_timeout = @config[:read_timeout] conn..write_timeout = @config[:write_timeout] # Configure SSL conn.ssl.verify = @config[:ssl_verify] conn.ssl.ca_file = @config[:ssl_ca_file] if @config[:ssl_ca_file] conn.ssl.ca_path = @config[:ssl_ca_path] if @config[:ssl_ca_path] # Configure proxy conn.proxy = @config[:proxy] if @config[:proxy] # Set default headers conn.headers.update(@config[:headers]) if @config[:headers].any? end end |
#close ⇒ Object
Close connection and cleanup resources
239 240 241 |
# File 'lib/a2a/transport/http.rb', line 239 def close @connection&.close end |
#default_config ⇒ Hash (private)
Build default configuration
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/a2a/transport/http.rb', line 250 def default_config { timeout: DEFAULT_TIMEOUT, open_timeout: DEFAULT_OPEN_TIMEOUT, read_timeout: DEFAULT_READ_TIMEOUT, write_timeout: DEFAULT_WRITE_TIMEOUT, pool_size: DEFAULT_POOL_SIZE, pool_timeout: DEFAULT_POOL_TIMEOUT, retry_count: DEFAULT_RETRY_COUNT, retry_delay: DEFAULT_RETRY_DELAY, ssl_verify: true, headers: {}, logging: false, logger: nil, proxy: nil } end |
#default_logger ⇒ Logger (private)
Get default logger
405 406 407 408 409 410 411 412 |
# File 'lib/a2a/transport/http.rb', line 405 def default_logger logger = Logger.new($stdout) logger.level = Logger::INFO logger.formatter = proc do |severity, datetime, progname, msg| "[#{datetime}] #{severity} -- #{progname}: #{msg}\n" end logger end |
#delete(path = "", params: {}, headers: {}) ⇒ Faraday::Response
Send DELETE request
194 195 196 |
# File 'lib/a2a/transport/http.rb', line 194 def delete(path = "", params: {}, headers: {}) request(:delete, path, params: params, headers: headers) end |
#get(path = "", params: {}, headers: {}) ⇒ Faraday::Response
Send GET request
156 157 158 |
# File 'lib/a2a/transport/http.rb', line 156 def get(path = "", params: {}, headers: {}) request(:get, path, params: params, headers: headers) end |
#handle_response(response) ⇒ Faraday::Response (private)
Handle HTTP response
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 |
# File 'lib/a2a/transport/http.rb', line 343 def handle_response(response) case response.status when 200..299 response when 400..499 raise A2A::Errors::HTTPError.new( "Client error: #{response.status}", status_code: response.status, response_body: response.body ) when 500..599 raise A2A::Errors::HTTPError.new( "Server error: #{response.status}", status_code: response.status, response_body: response.body ) else raise A2A::Errors::HTTPError.new( "Unexpected status: #{response.status}", status_code: response.status, response_body: response.body ) end end |
#json_rpc_request(rpc_request, headers: {}) ⇒ Hash
Send JSON-RPC request
206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/a2a/transport/http.rb', line 206 def json_rpc_request(rpc_request, headers: {}) default_headers = { "Content-Type" => "application/json", "Accept" => "application/json" } response = post( body: rpc_request.to_json, headers: default_headers.merge(headers) ) parse_json_response(response) end |
#metrics ⇒ Hash
Get connection metrics
225 226 227 |
# File 'lib/a2a/transport/http.rb', line 225 def metrics @metrics.to_h end |
#parse_json_response(response) ⇒ Hash (private)
Parse JSON response
375 376 377 378 379 380 381 |
# File 'lib/a2a/transport/http.rb', line 375 def parse_json_response(response) return response.body if response.body.is_a?(Hash) JSON.parse(response.body) rescue JSON::ParserError => e raise A2A::Errors::JSONError, "Invalid JSON response: #{e.}" end |
#post(path = "", body: nil, params: {}, headers: {}) ⇒ Faraday::Response
Send POST request
169 170 171 |
# File 'lib/a2a/transport/http.rb', line 169 def post(path = "", body: nil, params: {}, headers: {}) request(:post, path, params: params, headers: headers, body: body) end |
#prepare_body(body) ⇒ String (private)
Prepare request body
325 326 327 328 329 330 331 332 333 334 |
# File 'lib/a2a/transport/http.rb', line 325 def prepare_body(body) case body when String body when Hash, Array body.to_json else body.respond_to?(:to_json) ? body.to_json : body.to_s end end |
#put(path = "", body: nil, params: {}, headers: {}) ⇒ Faraday::Response
Send PUT request
182 183 184 |
# File 'lib/a2a/transport/http.rb', line 182 def put(path = "", body: nil, params: {}, headers: {}) request(:put, path, params: params, headers: headers, body: body) end |
#record_metrics(method, status, duration) ⇒ Object (private)
Record request metrics
390 391 392 393 394 395 396 397 398 |
# File 'lib/a2a/transport/http.rb', line 390 def record_metrics(method, status, duration) @metrics["#{method}_requests"] += 1 @metrics["#{method}_#{status}"] += 1 @metrics["total_requests"] += 1 @metrics["total_duration"] += duration # Track average duration @metrics["average_duration"] = @metrics["total_duration"] / @metrics["total_requests"] end |
#request(method, path = "", params: {}, headers: {}, body: nil) ⇒ Faraday::Response
Send HTTP request
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/a2a/transport/http.rb', line 80 def request(method, path = "", params: {}, headers: {}, body: nil) start_time = Time.now begin response = @connection.public_send(method, path) do |req| req.params.update(params) if params.any? req.headers.update(headers) if headers.any? req.body = prepare_body(body) if body end record_metrics(method, response.status, Time.now - start_time) handle_response(response) response rescue A2A::Errors::HTTPError => e # Re-raise A2A HTTP errors (from handle_response) record_metrics(method, e.status_code || :http_error, Time.now - start_time) raise e rescue Faraday::TimeoutError => e record_metrics(method, :timeout, Time.now - start_time) raise A2A::Errors::TimeoutError, "Request timeout: #{e.}" rescue Faraday::ConnectionFailed => e record_metrics(method, :connection_failed, Time.now - start_time) # Check if it's a timeout-like error if e..include?("timeout") || e..include?("execution expired") raise A2A::Errors::TimeoutError, "Request timeout: #{e.}" end raise A2A::Errors::TransportError, "Connection failed: #{e.}" rescue Faraday::SSLError => e record_metrics(method, :ssl_error, Time.now - start_time) raise A2A::Errors::TransportError, "SSL error: #{e.}" rescue Faraday::ClientError => e record_metrics(method, :client_error, Time.now - start_time) # Handle HTTP status errors from Faraday raise A2A::Errors::TransportError, "Client error: #{e.}" unless e.response && e.response[:status] status = e.response[:status] case status when 400..499 raise A2A::Errors::HTTPError.new( "Client error: #{status}", status_code: status, response_body: e.response[:body] ) when 500..599 raise A2A::Errors::HTTPError.new( "Server error: #{status}", status_code: status, response_body: e.response[:body] ) else raise A2A::Errors::HTTPError.new( "HTTP error: #{status}", status_code: status, response_body: e.response[:body] ) end rescue StandardError => e record_metrics(method, :error, Time.now - start_time) # Check if it's a timeout-like error if e..include?("timeout") || e..include?("execution expired") raise A2A::Errors::TimeoutError, "Request timeout: #{e.}" end raise A2A::Errors::TransportError, "Transport error: #{e.}" end end |
#reset_metrics! ⇒ Object
Reset connection metrics
232 233 234 |
# File 'lib/a2a/transport/http.rb', line 232 def reset_metrics! @metrics.clear end |