Class: A2A::Transport::Grpc
- Inherits:
-
Object
- Object
- A2A::Transport::Grpc
- Defined in:
- lib/a2a/transport/grpc.rb
Overview
gRPC transport implementation with optional dependency Provides bidirectional streaming support and gRPC-specific error mapping
Constant Summary collapse
- GRPC_AVAILABLE =
Check if gRPC is available
defined?(::GRPC) && ::GRPC.const_defined?(:ClientStub)
- DEFAULT_TIMEOUT =
Default configuration values
30- DEFAULT_DEADLINE =
60- DEFAULT_MAX_RECEIVE_MESSAGE_SIZE =
4MB
4 * 1024 * 1024
- DEFAULT_MAX_SEND_MESSAGE_SIZE =
4MB
4 * 1024 * 1024
- DEFAULT_KEEPALIVE_TIME =
30- DEFAULT_KEEPALIVE_TIMEOUT =
5
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#credentials ⇒ Object
readonly
Returns the value of attribute credentials.
-
#endpoint ⇒ Object
readonly
Returns the value of attribute endpoint.
-
#stub ⇒ Object
readonly
Returns the value of attribute stub.
Instance Method Summary collapse
-
#bidi_streaming_call(method, requests, metadata: {}, timeout: nil) ⇒ Enumerator
Send bidirectional streaming gRPC request.
-
#build_a2a_request(message) ⇒ Object
private
Build A2A message request.
-
#build_call_options ⇒ Hash
private
Build call options.
-
#build_channel_args ⇒ Hash
private
Build channel arguments.
-
#build_credentials ⇒ Object
private
Build gRPC credentials.
-
#build_empty_request ⇒ Object
private
Build empty request.
-
#build_stub ⇒ Object
private
Build gRPC stub.
-
#build_task_request(task_id) ⇒ Object
private
Build task request.
-
#cancel_a2a_task(task_id, metadata: {}) ⇒ Object
Cancel A2A task via gRPC.
-
#client_streaming_call(method, requests, metadata: {}, timeout: nil) ⇒ Object
Send client streaming gRPC request.
-
#connect ⇒ Boolean
Connect to gRPC service.
-
#connected? ⇒ Boolean
Check if connected.
-
#default_config ⇒ Hash
private
Build default configuration.
-
#disconnect ⇒ Object
Disconnect from gRPC service.
-
#ensure_connected! ⇒ Object
private
Ensure connection is established.
-
#get_a2a_task(task_id, metadata: {}) ⇒ Object
Get A2A task via gRPC.
-
#get_agent_card(metadata: {}) ⇒ Object
Get agent card via gRPC.
-
#initialize(endpoint, config = {}) ⇒ Grpc
constructor
Initialize gRPC transport.
-
#map_grpc_error(grpc_error) ⇒ A2A::Errors::A2AError
private
Map gRPC error to A2A error.
-
#send_a2a_message(message, streaming: false, metadata: {}) ⇒ Object, Enumerator
Send A2A message via gRPC.
-
#server_streaming_call(method, request, metadata: {}, timeout: nil) ⇒ Enumerator
Send server streaming gRPC request.
-
#unary_call(method, request, metadata: {}, timeout: nil) ⇒ Object
Send unary gRPC request.
Constructor Details
#initialize(endpoint, config = {}) ⇒ Grpc
Initialize gRPC transport
72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/a2a/transport/grpc.rb', line 72 def initialize(endpoint, config = {}) unless GRPC_AVAILABLE raise A2A::Errors::TransportError, "gRPC is not available. Install the 'grpc' gem to use gRPC transport." end @endpoint = endpoint @config = default_config.merge(config) @credentials = build_credentials @stub = nil = end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
52 53 54 |
# File 'lib/a2a/transport/grpc.rb', line 52 def config @config end |
#credentials ⇒ Object (readonly)
Returns the value of attribute credentials.
52 53 54 |
# File 'lib/a2a/transport/grpc.rb', line 52 def credentials @credentials end |
#endpoint ⇒ Object (readonly)
Returns the value of attribute endpoint.
52 53 54 |
# File 'lib/a2a/transport/grpc.rb', line 52 def endpoint @endpoint end |
#stub ⇒ Object (readonly)
Returns the value of attribute stub.
52 53 54 |
# File 'lib/a2a/transport/grpc.rb', line 52 def stub @stub end |
Instance Method Details
#bidi_streaming_call(method, requests, metadata: {}, timeout: nil) ⇒ Enumerator
Send bidirectional streaming gRPC request
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/a2a/transport/grpc.rb', line 210 def bidi_streaming_call(method, requests, metadata: {}, timeout: nil) ensure_connected! = .dup [:timeout] = timeout if timeout [:metadata] = @config[:metadata].merge() begin @stub.public_send(method, requests, ) rescue ::GRPC::BadStatus => e raise map_grpc_error(e) rescue StandardError => e raise A2A::Errors::TransportError, "gRPC bidirectional streaming call failed: #{e.message}" end end |
#build_a2a_request(message) ⇒ Object (private)
Build A2A message request
418 419 420 421 422 423 424 |
# File 'lib/a2a/transport/grpc.rb', line 418 def build_a2a_request() # This would convert A2A::Types::Message to protobuf message # For now, return a hash representation { message: .to_h } end |
#build_call_options ⇒ Hash (private)
Build call options
373 374 375 376 377 378 379 |
# File 'lib/a2a/transport/grpc.rb', line 373 def { timeout: @config[:timeout], deadline: Time.now + @config[:deadline], metadata: @config[:metadata] } end |
#build_channel_args ⇒ Hash (private)
Build channel arguments
355 356 357 358 359 360 361 362 363 364 365 366 |
# File 'lib/a2a/transport/grpc.rb', line 355 def build_channel_args { "grpc.keepalive_time_ms" => @config[:keepalive_time] * 1000, "grpc.keepalive_timeout_ms" => @config[:keepalive_timeout] * 1000, "grpc.keepalive_permit_without_calls" => 1, "grpc.http2.max_pings_without_data" => 0, "grpc.http2.min_time_between_pings_ms" => 10_000, "grpc.http2.min_ping_interval_without_data_ms" => 300_000, "grpc.max_receive_message_length" => @config[:max_receive_message_size], "grpc.max_send_message_length" => @config[:max_send_message_size] } end |
#build_credentials ⇒ Object (private)
Build gRPC credentials
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/a2a/transport/grpc.rb', line 318 def build_credentials return @config[:credentials] if @config[:credentials] if @config[:use_tls] if @config[:ca_file] || @config[:cert_file] || @config[:key_file] # Custom TLS credentials ca_cert = @config[:ca_file] ? File.read(@config[:ca_file]) : nil cert = @config[:cert_file] ? File.read(@config[:cert_file]) : nil key = @config[:key_file] ? File.read(@config[:key_file]) : nil ::GRPC::Core::ChannelCredentials.new(ca_cert, key, cert) else # Default TLS credentials ::GRPC::Core::ChannelCredentials.new end else # Insecure credentials :this_channel_is_insecure end end |
#build_empty_request ⇒ Object (private)
Build empty request
443 444 445 |
# File 'lib/a2a/transport/grpc.rb', line 443 def build_empty_request {} end |
#build_stub ⇒ Object (private)
Build gRPC stub
344 345 346 347 348 |
# File 'lib/a2a/transport/grpc.rb', line 344 def build_stub # This would be replaced with actual A2A gRPC service stub # For now, we'll create a generic stub interface A2AServiceStub.new(@endpoint, @credentials, channel_args: build_channel_args) end |
#build_task_request(task_id) ⇒ Object (private)
Build task request
432 433 434 435 436 |
# File 'lib/a2a/transport/grpc.rb', line 432 def build_task_request(task_id) { task_id: task_id } end |
#cancel_a2a_task(task_id, metadata: {}) ⇒ Object
Cancel A2A task via gRPC
263 264 265 266 |
# File 'lib/a2a/transport/grpc.rb', line 263 def cancel_a2a_task(task_id, metadata: {}) request = build_task_request(task_id) unary_call(:cancel_task, request, metadata: ) end |
#client_streaming_call(method, requests, metadata: {}, timeout: nil) ⇒ Object
Send client streaming gRPC request
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/a2a/transport/grpc.rb', line 158 def client_streaming_call(method, requests, metadata: {}, timeout: nil) ensure_connected! = .dup [:timeout] = timeout if timeout [:metadata] = @config[:metadata].merge() begin @stub.public_send(method, requests, ) rescue ::GRPC::BadStatus => e raise map_grpc_error(e) rescue StandardError => e raise A2A::Errors::TransportError, "gRPC streaming call failed: #{e.message}" end end |
#connect ⇒ Boolean
Connect to gRPC service
91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/a2a/transport/grpc.rb', line 91 def connect return true if connected? begin @stub = build_stub # Test connection with a simple call @stub.class.rpc_descs.keys.first&.tap do |_method| # This is a simplified connection test end true rescue StandardError => e raise A2A::Errors::TransportError, "Failed to connect to gRPC service: #{e.message}" end end |
#connected? ⇒ Boolean
Check if connected
118 119 120 |
# File 'lib/a2a/transport/grpc.rb', line 118 def connected? !@stub.nil? end |
#default_config ⇒ Hash (private)
Build default configuration
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/a2a/transport/grpc.rb', line 296 def default_config { timeout: DEFAULT_TIMEOUT, deadline: DEFAULT_DEADLINE, max_receive_message_size: DEFAULT_MAX_RECEIVE_MESSAGE_SIZE, max_send_message_size: DEFAULT_MAX_SEND_MESSAGE_SIZE, keepalive_time: DEFAULT_KEEPALIVE_TIME, keepalive_timeout: DEFAULT_KEEPALIVE_TIMEOUT, use_tls: true, ca_file: nil, cert_file: nil, key_file: nil, metadata: {}, credentials: nil } end |
#disconnect ⇒ Object
Disconnect from gRPC service
109 110 111 |
# File 'lib/a2a/transport/grpc.rb', line 109 def disconnect @stub = nil end |
#ensure_connected! ⇒ Object (private)
Ensure connection is established
286 287 288 289 |
# File 'lib/a2a/transport/grpc.rb', line 286 def ensure_connected! connect unless connected? raise A2A::Errors::TransportError, "Not connected to gRPC service" unless connected? end |
#get_a2a_task(task_id, metadata: {}) ⇒ Object
Get A2A task via gRPC
251 252 253 254 |
# File 'lib/a2a/transport/grpc.rb', line 251 def get_a2a_task(task_id, metadata: {}) request = build_task_request(task_id) unary_call(:get_task, request, metadata: ) end |
#get_agent_card(metadata: {}) ⇒ Object
Get agent card via gRPC
274 275 276 277 |
# File 'lib/a2a/transport/grpc.rb', line 274 def get_agent_card(metadata: {}) request = build_empty_request unary_call(:get_agent_card, request, metadata: ) end |
#map_grpc_error(grpc_error) ⇒ A2A::Errors::A2AError (private)
Map gRPC error to A2A error
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 |
# File 'lib/a2a/transport/grpc.rb', line 387 def map_grpc_error(grpc_error) case grpc_error.code when ::GRPC::Core::StatusCodes::CANCELLED A2A::Errors::TaskNotCancelable.new("Request cancelled: #{grpc_error.details}") when ::GRPC::Core::StatusCodes::INVALID_ARGUMENT A2A::Errors::InvalidParams.new("Invalid argument: #{grpc_error.details}") when ::GRPC::Core::StatusCodes::DEADLINE_EXCEEDED A2A::Errors::TimeoutError.new("Deadline exceeded: #{grpc_error.details}") when ::GRPC::Core::StatusCodes::NOT_FOUND A2A::Errors::TaskNotFound.new("Not found: #{grpc_error.details}") when ::GRPC::Core::StatusCodes::PERMISSION_DENIED A2A::Errors::AuthorizationFailed.new("Permission denied: #{grpc_error.details}") when ::GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED A2A::Errors::ResourceExhausted.new("Resource exhausted: #{grpc_error.details}") when ::GRPC::Core::StatusCodes::UNIMPLEMENTED A2A::Errors::CapabilityNotSupported.new("Unimplemented: #{grpc_error.details}") when ::GRPC::Core::StatusCodes::UNAVAILABLE A2A::Errors::AgentUnavailable.new("Service unavailable: #{grpc_error.details}") when ::GRPC::Core::StatusCodes::UNAUTHENTICATED A2A::Errors::AuthenticationRequired.new("Unauthenticated: #{grpc_error.details}") else A2A::Errors::TransportError.new("gRPC error (#{grpc_error.code}): #{grpc_error.details}") end end |
#send_a2a_message(message, streaming: false, metadata: {}) ⇒ Object, Enumerator
Send A2A message via gRPC
234 235 236 237 238 239 240 241 242 |
# File 'lib/a2a/transport/grpc.rb', line 234 def (, streaming: false, metadata: {}) request = build_a2a_request() if streaming server_streaming_call(:send_message_stream, request, metadata: ) else unary_call(:send_message, request, metadata: ) end end |
#server_streaming_call(method, request, metadata: {}, timeout: nil) ⇒ Enumerator
Send server streaming gRPC request
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/a2a/transport/grpc.rb', line 184 def server_streaming_call(method, request, metadata: {}, timeout: nil) ensure_connected! = .dup [:timeout] = timeout if timeout [:metadata] = @config[:metadata].merge() begin @stub.public_send(method, request, ) rescue ::GRPC::BadStatus => e raise map_grpc_error(e) rescue StandardError => e raise A2A::Errors::TransportError, "gRPC streaming call failed: #{e.message}" end end |
#unary_call(method, request, metadata: {}, timeout: nil) ⇒ Object
Send unary gRPC request
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/a2a/transport/grpc.rb', line 132 def unary_call(method, request, metadata: {}, timeout: nil) ensure_connected! = .dup [:timeout] = timeout if timeout [:metadata] = @config[:metadata].merge() begin @stub.public_send(method, request, ) rescue ::GRPC::BadStatus => e raise map_grpc_error(e) rescue StandardError => e raise A2A::Errors::TransportError, "gRPC call failed: #{e.message}" end end |