Class: A2A::Transport::Grpc

Inherits:
Object
  • Object
show all
Defined in:
lib/a2a/transport/grpc.rb

Overview

gRPC transport implementation with optional dependency Provides bidirectional streaming support and gRPC-specific error mapping

Constant Summary collapse

GRPC_AVAILABLE =

Check if gRPC is available

defined?(::GRPC) && ::GRPC.const_defined?(:ClientStub)
DEFAULT_TIMEOUT =

Default configuration values

30
DEFAULT_DEADLINE =
60
DEFAULT_MAX_RECEIVE_MESSAGE_SIZE =

4MB

4 * 1024 * 1024
DEFAULT_MAX_SEND_MESSAGE_SIZE =

4MB

4 * 1024 * 1024
DEFAULT_KEEPALIVE_TIME =
30
DEFAULT_KEEPALIVE_TIMEOUT =
5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(endpoint, config = {}) ⇒ Grpc

Initialize gRPC transport

Parameters:

  • endpoint (String)

    gRPC endpoint (host:port)

  • config (Hash) (defaults to: {})

    Configuration options

Options Hash (config):

  • :timeout (Integer) — default: 30

    Request timeout in seconds

  • :deadline (Integer) — default: 60

    Request deadline in seconds

  • :max_receive_message_size (Integer) — default: 4MB

    Max receive message size

  • :max_send_message_size (Integer) — default: 4MB

    Max send message size

  • :keepalive_time (Integer) — default: 30

    Keepalive time in seconds

  • :keepalive_timeout (Integer) — default: 5

    Keepalive timeout in seconds

  • :use_tls (Boolean) — default: true

    Use TLS encryption

  • :ca_file (String)

    Path to CA certificate file

  • :cert_file (String)

    Path to client certificate file

  • :key_file (String)

    Path to client private key file

  • :metadata (Hash) — default: {}

    Default metadata

  • :credentials (Object)

    Custom credentials object



72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/a2a/transport/grpc.rb', line 72

def initialize(endpoint, config = {})
  unless GRPC_AVAILABLE
    raise A2A::Errors::TransportError,
          "gRPC is not available. Install the 'grpc' gem to use gRPC transport."
  end

  @endpoint = endpoint
  @config = default_config.merge(config)
  @credentials = build_credentials
  @stub = nil
  @call_options = build_call_options
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



52
53
54
# File 'lib/a2a/transport/grpc.rb', line 52

def config
  @config
end

#credentialsObject (readonly)

Returns the value of attribute credentials.



52
53
54
# File 'lib/a2a/transport/grpc.rb', line 52

def credentials
  @credentials
end

#endpointObject (readonly)

Returns the value of attribute endpoint.



52
53
54
# File 'lib/a2a/transport/grpc.rb', line 52

def endpoint
  @endpoint
end

#stubObject (readonly)

Returns the value of attribute stub.



52
53
54
# File 'lib/a2a/transport/grpc.rb', line 52

def stub
  @stub
end

Instance Method Details

#bidi_streaming_call(method, requests, metadata: {}, timeout: nil) ⇒ Enumerator

Send bidirectional streaming gRPC request

Parameters:

  • method (Symbol)

    gRPC method name

  • requests (Enumerator)

    Request stream

  • metadata (Hash) (defaults to: {})

    Request metadata

  • timeout (Integer, nil) (defaults to: nil)

    Request timeout

Returns:

  • (Enumerator)

    Response stream

Raises:



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/a2a/transport/grpc.rb', line 210

def bidi_streaming_call(method, requests, metadata: {}, timeout: nil)
  ensure_connected!

  call_options = @call_options.dup
  call_options[:timeout] = timeout if timeout
  call_options[:metadata] = @config[:metadata].merge()

  begin
    @stub.public_send(method, requests, call_options)
  rescue ::GRPC::BadStatus => e
    raise map_grpc_error(e)
  rescue StandardError => e
    raise A2A::Errors::TransportError, "gRPC bidirectional streaming call failed: #{e.message}"
  end
end

#build_a2a_request(message) ⇒ Object (private)

Build A2A message request

Parameters:

Returns:

  • (Object)

    gRPC request object



418
419
420
421
422
423
424
# File 'lib/a2a/transport/grpc.rb', line 418

def build_a2a_request(message)
  # This would convert A2A::Types::Message to protobuf message
  # For now, return a hash representation
  {
    message: message.to_h
  }
end

#build_call_optionsHash (private)

Build call options

Returns:

  • (Hash)

    Call options



373
374
375
376
377
378
379
# File 'lib/a2a/transport/grpc.rb', line 373

def build_call_options
  {
    timeout: @config[:timeout],
    deadline: Time.now + @config[:deadline],
    metadata: @config[:metadata]
  }
end

#build_channel_argsHash (private)

Build channel arguments

Returns:

  • (Hash)

    Channel arguments



355
356
357
358
359
360
361
362
363
364
365
366
# File 'lib/a2a/transport/grpc.rb', line 355

def build_channel_args
  {
    "grpc.keepalive_time_ms" => @config[:keepalive_time] * 1000,
    "grpc.keepalive_timeout_ms" => @config[:keepalive_timeout] * 1000,
    "grpc.keepalive_permit_without_calls" => 1,
    "grpc.http2.max_pings_without_data" => 0,
    "grpc.http2.min_time_between_pings_ms" => 10_000,
    "grpc.http2.min_ping_interval_without_data_ms" => 300_000,
    "grpc.max_receive_message_length" => @config[:max_receive_message_size],
    "grpc.max_send_message_length" => @config[:max_send_message_size]
  }
end

#build_credentialsObject (private)

Build gRPC credentials

Returns:

  • (Object)

    gRPC credentials



318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/a2a/transport/grpc.rb', line 318

def build_credentials
  return @config[:credentials] if @config[:credentials]

  if @config[:use_tls]
    if @config[:ca_file] || @config[:cert_file] || @config[:key_file]
      # Custom TLS credentials
      ca_cert = @config[:ca_file] ? File.read(@config[:ca_file]) : nil
      cert = @config[:cert_file] ? File.read(@config[:cert_file]) : nil
      key = @config[:key_file] ? File.read(@config[:key_file]) : nil

      ::GRPC::Core::ChannelCredentials.new(ca_cert, key, cert)
    else
      # Default TLS credentials
      ::GRPC::Core::ChannelCredentials.new
    end
  else
    # Insecure credentials
    :this_channel_is_insecure
  end
end

#build_empty_requestObject (private)

Build empty request

Returns:

  • (Object)

    gRPC request object



443
444
445
# File 'lib/a2a/transport/grpc.rb', line 443

def build_empty_request
  {}
end

#build_stubObject (private)

Build gRPC stub

Returns:

  • (Object)

    gRPC stub instance



344
345
346
347
348
# File 'lib/a2a/transport/grpc.rb', line 344

def build_stub
  # This would be replaced with actual A2A gRPC service stub
  # For now, we'll create a generic stub interface
  A2AServiceStub.new(@endpoint, @credentials, channel_args: build_channel_args)
end

#build_task_request(task_id) ⇒ Object (private)

Build task request

Parameters:

  • task_id (String)

    Task ID

Returns:

  • (Object)

    gRPC request object



432
433
434
435
436
# File 'lib/a2a/transport/grpc.rb', line 432

def build_task_request(task_id)
  {
    task_id: task_id
  }
end

#cancel_a2a_task(task_id, metadata: {}) ⇒ Object

Cancel A2A task via gRPC

Parameters:

  • task_id (String)

    Task ID

  • metadata (Hash) (defaults to: {})

    Request metadata

Returns:

  • (Object)

    Cancellation response



263
264
265
266
# File 'lib/a2a/transport/grpc.rb', line 263

def cancel_a2a_task(task_id, metadata: {})
  request = build_task_request(task_id)
  unary_call(:cancel_task, request, metadata: )
end

#client_streaming_call(method, requests, metadata: {}, timeout: nil) ⇒ Object

Send client streaming gRPC request

Parameters:

  • method (Symbol)

    gRPC method name

  • requests (Enumerator)

    Request stream

  • metadata (Hash) (defaults to: {})

    Request metadata

  • timeout (Integer, nil) (defaults to: nil)

    Request timeout

Returns:

  • (Object)

    Response message

Raises:



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/a2a/transport/grpc.rb', line 158

def client_streaming_call(method, requests, metadata: {}, timeout: nil)
  ensure_connected!

  call_options = @call_options.dup
  call_options[:timeout] = timeout if timeout
  call_options[:metadata] = @config[:metadata].merge()

  begin
    @stub.public_send(method, requests, call_options)
  rescue ::GRPC::BadStatus => e
    raise map_grpc_error(e)
  rescue StandardError => e
    raise A2A::Errors::TransportError, "gRPC streaming call failed: #{e.message}"
  end
end

#connectBoolean

Connect to gRPC service

Returns:

  • (Boolean)

    Connection success

Raises:



91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/a2a/transport/grpc.rb', line 91

def connect
  return true if connected?

  begin
    @stub = build_stub
    # Test connection with a simple call
    @stub.class.rpc_descs.keys.first&.tap do |_method|
      # This is a simplified connection test
    end
    true
  rescue StandardError => e
    raise A2A::Errors::TransportError, "Failed to connect to gRPC service: #{e.message}"
  end
end

#connected?Boolean

Check if connected

Returns:

  • (Boolean)

    Connection status



118
119
120
# File 'lib/a2a/transport/grpc.rb', line 118

def connected?
  !@stub.nil?
end

#default_configHash (private)

Build default configuration

Returns:

  • (Hash)

    Default configuration



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/a2a/transport/grpc.rb', line 296

def default_config
  {
    timeout: DEFAULT_TIMEOUT,
    deadline: DEFAULT_DEADLINE,
    max_receive_message_size: DEFAULT_MAX_RECEIVE_MESSAGE_SIZE,
    max_send_message_size: DEFAULT_MAX_SEND_MESSAGE_SIZE,
    keepalive_time: DEFAULT_KEEPALIVE_TIME,
    keepalive_timeout: DEFAULT_KEEPALIVE_TIMEOUT,
    use_tls: true,
    ca_file: nil,
    cert_file: nil,
    key_file: nil,
    metadata: {},
    credentials: nil
  }
end

#disconnectObject

Disconnect from gRPC service



109
110
111
# File 'lib/a2a/transport/grpc.rb', line 109

def disconnect
  @stub = nil
end

#ensure_connected!Object (private)

Ensure connection is established

Raises:



286
287
288
289
# File 'lib/a2a/transport/grpc.rb', line 286

def ensure_connected!
  connect unless connected?
  raise A2A::Errors::TransportError, "Not connected to gRPC service" unless connected?
end

#get_a2a_task(task_id, metadata: {}) ⇒ Object

Get A2A task via gRPC

Parameters:

  • task_id (String)

    Task ID

  • metadata (Hash) (defaults to: {})

    Request metadata

Returns:

  • (Object)

    Task response



251
252
253
254
# File 'lib/a2a/transport/grpc.rb', line 251

def get_a2a_task(task_id, metadata: {})
  request = build_task_request(task_id)
  unary_call(:get_task, request, metadata: )
end

#get_agent_card(metadata: {}) ⇒ Object

Get agent card via gRPC

Parameters:

  • metadata (Hash) (defaults to: {})

    Request metadata

Returns:

  • (Object)

    Agent card response



274
275
276
277
# File 'lib/a2a/transport/grpc.rb', line 274

def get_agent_card(metadata: {})
  request = build_empty_request
  unary_call(:get_agent_card, request, metadata: )
end

#map_grpc_error(grpc_error) ⇒ A2A::Errors::A2AError (private)

Map gRPC error to A2A error

Parameters:

  • grpc_error (GRPC::BadStatus)

    gRPC error

Returns:



387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# File 'lib/a2a/transport/grpc.rb', line 387

def map_grpc_error(grpc_error)
  case grpc_error.code
  when ::GRPC::Core::StatusCodes::CANCELLED
    A2A::Errors::TaskNotCancelable.new("Request cancelled: #{grpc_error.details}")
  when ::GRPC::Core::StatusCodes::INVALID_ARGUMENT
    A2A::Errors::InvalidParams.new("Invalid argument: #{grpc_error.details}")
  when ::GRPC::Core::StatusCodes::DEADLINE_EXCEEDED
    A2A::Errors::TimeoutError.new("Deadline exceeded: #{grpc_error.details}")
  when ::GRPC::Core::StatusCodes::NOT_FOUND
    A2A::Errors::TaskNotFound.new("Not found: #{grpc_error.details}")
  when ::GRPC::Core::StatusCodes::PERMISSION_DENIED
    A2A::Errors::AuthorizationFailed.new("Permission denied: #{grpc_error.details}")
  when ::GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED
    A2A::Errors::ResourceExhausted.new("Resource exhausted: #{grpc_error.details}")
  when ::GRPC::Core::StatusCodes::UNIMPLEMENTED
    A2A::Errors::CapabilityNotSupported.new("Unimplemented: #{grpc_error.details}")
  when ::GRPC::Core::StatusCodes::UNAVAILABLE
    A2A::Errors::AgentUnavailable.new("Service unavailable: #{grpc_error.details}")
  when ::GRPC::Core::StatusCodes::UNAUTHENTICATED
    A2A::Errors::AuthenticationRequired.new("Unauthenticated: #{grpc_error.details}")
  else
    A2A::Errors::TransportError.new("gRPC error (#{grpc_error.code}): #{grpc_error.details}")
  end
end

#send_a2a_message(message, streaming: false, metadata: {}) ⇒ Object, Enumerator

Send A2A message via gRPC

Parameters:

  • message (A2A::Types::Message)

    A2A message

  • streaming (Boolean) (defaults to: false)

    Use streaming response

  • metadata (Hash) (defaults to: {})

    Request metadata

Returns:

  • (Object, Enumerator)

    Response or response stream



234
235
236
237
238
239
240
241
242
# File 'lib/a2a/transport/grpc.rb', line 234

def send_a2a_message(message, streaming: false, metadata: {})
  request = build_a2a_request(message)

  if streaming
    server_streaming_call(:send_message_stream, request, metadata: )
  else
    unary_call(:send_message, request, metadata: )
  end
end

#server_streaming_call(method, request, metadata: {}, timeout: nil) ⇒ Enumerator

Send server streaming gRPC request

Parameters:

  • method (Symbol)

    gRPC method name

  • request (Object)

    Request message

  • metadata (Hash) (defaults to: {})

    Request metadata

  • timeout (Integer, nil) (defaults to: nil)

    Request timeout

Returns:

  • (Enumerator)

    Response stream

Raises:



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/a2a/transport/grpc.rb', line 184

def server_streaming_call(method, request, metadata: {}, timeout: nil)
  ensure_connected!

  call_options = @call_options.dup
  call_options[:timeout] = timeout if timeout
  call_options[:metadata] = @config[:metadata].merge()

  begin
    @stub.public_send(method, request, call_options)
  rescue ::GRPC::BadStatus => e
    raise map_grpc_error(e)
  rescue StandardError => e
    raise A2A::Errors::TransportError, "gRPC streaming call failed: #{e.message}"
  end
end

#unary_call(method, request, metadata: {}, timeout: nil) ⇒ Object

Send unary gRPC request

Parameters:

  • method (Symbol)

    gRPC method name

  • request (Object)

    Request message

  • metadata (Hash) (defaults to: {})

    Request metadata

  • timeout (Integer, nil) (defaults to: nil)

    Request timeout

Returns:

  • (Object)

    Response message

Raises:



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/a2a/transport/grpc.rb', line 132

def unary_call(method, request, metadata: {}, timeout: nil)
  ensure_connected!

  call_options = @call_options.dup
  call_options[:timeout] = timeout if timeout
  call_options[:metadata] = @config[:metadata].merge()

  begin
    @stub.public_send(method, request, call_options)
  rescue ::GRPC::BadStatus => e
    raise map_grpc_error(e)
  rescue StandardError => e
    raise A2A::Errors::TransportError, "gRPC call failed: #{e.message}"
  end
end