Class: A2A::Transport::SSE

Inherits:
Object
  • Object
show all
Defined in:
lib/a2a/transport/sse.rb

Overview

Server-Sent Events (SSE) transport implementation Provides streaming responses with event parsing, connection management, and heartbeat support

Constant Summary collapse

EVENT_TYPES =

SSE event types

%w[
  message
  error
  heartbeat
  task_status_update
  task_artifact_update
  connection_established
  connection_closed
].freeze
DEFAULT_HEARTBEAT_INTERVAL =

Default configuration values

30
DEFAULT_RECONNECT_DELAY =
3000
DEFAULT_MAX_RECONNECT_ATTEMPTS =
10
DEFAULT_BUFFER_SIZE =

8KB

1024 * 8
DEFAULT_TIMEOUT =
60

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url, config = {}) ⇒ SSE

Initialize SSE transport

Parameters:

  • SSE endpoint URL

  • (defaults to: {})

    Configuration options

Options Hash (config):

  • :heartbeat_interval (Integer) — default: 30

    Heartbeat interval in seconds

  • :reconnect_delay (Integer) — default: 3000

    Reconnection delay in milliseconds

  • :max_reconnect_attempts (Integer) — default: 10

    Maximum reconnection attempts

  • :buffer_size (Integer) — default: 8192

    Event buffer size in bytes

  • :timeout (Integer) — default: 60

    Connection timeout in seconds

  • :headers (Hash) — default: {}

    Default headers

  • :auto_reconnect (Boolean) — default: true

    Enable automatic reconnection

  • :last_event_id (String)

    Last received event ID for replay



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/a2a/transport/sse.rb', line 47

def initialize(url, config = {})
  @url = url
  @config = default_config.merge(config)
  @connection_state = :disconnected
  @event_buffer = Concurrent::Array.new
  @last_event_id = @config[:last_event_id]
  @reconnect_attempts = 0
  @heartbeat_timer = nil
  @event_listeners = Concurrent::Hash.new { |h, k| h[k] = [] }
  @mutex = Mutex.new
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



31
32
33
# File 'lib/a2a/transport/sse.rb', line 31

def config
  @config
end

#connection_stateObject (readonly)

Returns the value of attribute connection_state.



31
32
33
# File 'lib/a2a/transport/sse.rb', line 31

def connection_state
  @connection_state
end

#event_bufferObject (readonly)

Returns the value of attribute event_buffer.



31
32
33
# File 'lib/a2a/transport/sse.rb', line 31

def event_buffer
  @event_buffer
end

#last_event_idObject (readonly)

Returns the value of attribute last_event_id.



31
32
33
# File 'lib/a2a/transport/sse.rb', line 31

def last_event_id
  @last_event_id
end

#urlObject (readonly)

Returns the value of attribute url.



31
32
33
# File 'lib/a2a/transport/sse.rb', line 31

def url
  @url
end

Instance Method Details

#buffer_event(event) ⇒ Object (private)

Buffer event for replay

Parameters:

  • Event to buffer



291
292
293
294
295
296
# File 'lib/a2a/transport/sse.rb', line 291

def buffer_event(event)
  @event_buffer << event

  # Limit buffer size
  @event_buffer.shift while @event_buffer.size > @config[:buffer_size]
end

#buffered_eventsArray<SSEEvent>

Get buffered events

Returns:

  • Buffered events



162
163
164
# File 'lib/a2a/transport/sse.rb', line 162

def buffered_events
  @event_buffer.to_a
end

#build_headers(additional_headers = {}) ⇒ Hash (private)

Build request headers

Parameters:

  • (defaults to: {})

    Additional headers

Returns:

  • Complete headers



393
394
395
396
397
398
399
400
401
# File 'lib/a2a/transport/sse.rb', line 393

def build_headers(additional_headers = {})
  headers = {
    "Accept" => "text/event-stream",
    "Cache-Control" => "no-cache"
  }

  headers["Last-Event-ID"] = @last_event_id if @last_event_id
  headers.merge(@config[:headers]).merge(additional_headers)
end

#clear_buffer!Object

Clear event buffer



169
170
171
# File 'lib/a2a/transport/sse.rb', line 169

def clear_buffer!
  @event_buffer.clear
end

#connect(headers: {}) {|event| ... } ⇒ Enumerator

Connect to SSE endpoint and start streaming

Parameters:

  • (defaults to: {})

    Additional headers

Yields:

  • (event)

    Block to handle incoming events

Yield Parameters:

  • event (SSEEvent)

    Received SSE event

Returns:

  • Event stream enumerator



67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/a2a/transport/sse.rb', line 67

def connect(headers: {}, &block)
  @mutex.synchronize do
    return if @connection_state == :connected

    @connection_state = :connecting
    @reconnect_attempts = 0
  end

  if block_given?
    connect_with_callback(headers, &block)
  else
    connect_with_enumerator(headers)
  end
end

#connect_with_callback(headers) {|event| ... } ⇒ Object (private)

Connect with callback-based handling

Parameters:

  • Request headers

Yields:

  • (event)

    Event handler block



181
182
183
184
185
186
187
188
189
# File 'lib/a2a/transport/sse.rb', line 181

def connect_with_callback(headers)
  Thread.new do
    establish_connection(headers) do |event|
      yield(event) if block_given?
    end
  rescue StandardError => e
    handle_connection_error(e)
  end
end

#connect_with_enumerator(headers) ⇒ Enumerator (private)

Connect with enumerator-based handling

Parameters:

  • Request headers

Returns:

  • Event stream enumerator



197
198
199
200
201
202
203
# File 'lib/a2a/transport/sse.rb', line 197

def connect_with_enumerator(headers)
  Enumerator.new do |yielder|
    establish_connection(headers) do |event|
      yielder << event
    end
  end
end

#connected?Symbol

Get connection status

Returns:

  • Connection state (:disconnected, :connecting, :connected, :reconnecting)



153
154
155
# File 'lib/a2a/transport/sse.rb', line 153

def connected?
  @connection_state == :connected
end

#default_configHash (private)

Build default configuration

Returns:

  • Default configuration



408
409
410
411
412
413
414
415
416
417
418
419
# File 'lib/a2a/transport/sse.rb', line 408

def default_config
  {
    heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
    reconnect_delay: DEFAULT_RECONNECT_DELAY,
    max_reconnect_attempts: DEFAULT_MAX_RECONNECT_ATTEMPTS,
    buffer_size: DEFAULT_BUFFER_SIZE,
    timeout: DEFAULT_TIMEOUT,
    headers: {},
    auto_reconnect: true,
    last_event_id: nil
  }
end

#disconnectObject

Disconnect from SSE endpoint



85
86
87
88
89
90
91
92
93
# File 'lib/a2a/transport/sse.rb', line 85

def disconnect
  @mutex.synchronize do
    @connection_state = :disconnected
    stop_heartbeat
    @event_buffer.clear
  end

  emit_event(SSEEvent.new(type: "connection_closed", data: { reason: "manual_disconnect" }))
end

#emit_event(event) {|event| ... } ⇒ Object (private)

Emit event to listeners

Parameters:

  • Event to emit

Yields:

  • (event)

    Event handler block



304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
# File 'lib/a2a/transport/sse.rb', line 304

def emit_event(event)
  # Call specific event listeners
  @event_listeners[event.type].each do |listener|
    listener.call(event)
  rescue StandardError => e
    # Log error but don't stop processing
    logger = if defined?(::Rails) && ::Rails.respond_to?(:logger) && ::Rails.logger
               ::Rails.logger
             else
               require "logger"
               Logger.new($stdout)
             end
    logger.debug { "Error in event listener: #{e.message}" }
  end

  # Call generic block handler
  yield(event) if block_given?
end

#establish_connection(headers) {|event| ... } ⇒ Object (private)

Establish SSE connection

Parameters:

  • Request headers

Yields:

  • (event)

    Event handler block



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/a2a/transport/sse.rb', line 211

def establish_connection(headers, &block)
  request_headers = build_headers(headers)

  # Use HTTP transport for the underlying connection
  http = A2A::Transport::Http.new(@url, timeout: @config[:timeout])

  http.get(headers: request_headers) do |req|
    req.options.on_data = proc do |chunk, _size|
      process_chunk(chunk, &block)
    end
  end

  @connection_state = :connected
  start_heartbeat
  emit_event(SSEEvent.new(type: "connection_established", data: { url: @url }))
rescue StandardError => e
  handle_connection_error(e)
end

#handle_connection_error(error) ⇒ Object (private)

Handle connection errors

Parameters:

  • Connection error



328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/a2a/transport/sse.rb', line 328

def handle_connection_error(error)
  @connection_state = :disconnected
  stop_heartbeat

  error_event = SSEEvent.new(
    type: "error",
    data: {
      error: error.message,
      reconnect_attempts: @reconnect_attempts
    }
  )
  emit_event(error_event)

  # Attempt reconnection if enabled
  return unless @config[:auto_reconnect] && @reconnect_attempts < @config[:max_reconnect_attempts]

  schedule_reconnection
end

#off(event_type, handler = nil) ⇒ Object

Remove event listener

Parameters:

  • Event type

  • (defaults to: nil)

    Handler to remove (optional, removes all if nil)



111
112
113
114
115
116
117
# File 'lib/a2a/transport/sse.rb', line 111

def off(event_type, handler = nil)
  if handler
    @event_listeners[event_type.to_s].delete(handler)
  else
    @event_listeners[event_type.to_s].clear
  end
end

#on(event_type, &block) ⇒ Object

Add event listener for specific event type

Parameters:

  • Event type to listen for

  • Event handler block



101
102
103
# File 'lib/a2a/transport/sse.rb', line 101

def on(event_type, &block)
  @event_listeners[event_type.to_s] << block if block_given?
end

#parse_sse_line(line) ⇒ SSEEvent? (private)

Parse SSE line into event

Parameters:

  • SSE line

Returns:

  • Parsed event or nil



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/a2a/transport/sse.rb', line 255

def parse_sse_line(line)
  return nil if line.empty? || line.start_with?(":")

  if line.start_with?("data: ")
    data_content = line[6..]
    begin
      data = JSON.parse(data_content)
      SSEEvent.new(
        type: data["type"] || "message",
        data: data["data"] || data,
        id: data["id"],
        retry_interval: data["retry"]
      )
    rescue JSON::ParserError
      SSEEvent.new(type: "message", data: data_content)
    end
  elsif line.start_with?("event: ")
    # Store event type for next data line (simplified parsing)
    nil
  elsif line.start_with?("id: ")
    # Store event ID for next data line (simplified parsing)
    nil
  elsif line.start_with?("retry: ")
    # Update reconnection delay
    @config[:reconnect_delay] = line[7..].to_i
    nil
  else
    nil
  end
end

#process_chunk(chunk) {|event| ... } ⇒ Object (private)

Process incoming data chunk

Parameters:

  • Data chunk

Yields:

  • (event)

    Event handler block



236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/a2a/transport/sse.rb', line 236

def process_chunk(chunk, &block)
  lines = chunk.split("\n")

  lines.each do |line|
    event = parse_sse_line(line.strip)
    next unless event

    @last_event_id = event.id if event.id
    buffer_event(event)
    emit_event(event, &block)
  end
end

#schedule_reconnectionObject (private)

Schedule reconnection attempt



350
351
352
353
354
355
356
357
358
# File 'lib/a2a/transport/sse.rb', line 350

def schedule_reconnection
  @reconnect_attempts += 1
  @connection_state = :reconnecting

  Thread.new do
    sleep(@config[:reconnect_delay] / 1000.0)
    connect if @connection_state == :reconnecting
  end
end

#send_data(data, event_type: "message") ⇒ Boolean

Send data to SSE endpoint (for bidirectional communication)

Parameters:

  • Data to send

  • (defaults to: "message")

    Event type

Returns:

  • Success status



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/a2a/transport/sse.rb', line 126

def send_data(data, event_type: "message")
  return false unless @connection_state == :connected

  # This would typically use a separate HTTP connection for sending
  # as SSE is primarily unidirectional from server to client
  begin
    http_transport = A2A::Transport::Http.new(@url.gsub("/events", ""))
    response = http_transport.post(
      "/events/send",
      body: {
        type: event_type,
        data: data,
        last_event_id: @last_event_id
      }
    )
    response.status == 200
  rescue StandardError => e
    emit_event(SSEEvent.new(type: "error", data: { error: e.message }))
    false
  end
end

#start_heartbeatObject (private)

Start heartbeat timer



363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# File 'lib/a2a/transport/sse.rb', line 363

def start_heartbeat
  return unless @config[:heartbeat_interval].positive?

  @heartbeat_timer = Thread.new do
    loop do
      sleep(@config[:heartbeat_interval])
      break unless @connection_state == :connected

      emit_event(SSEEvent.new(
                   type: "heartbeat",
                   data: { timestamp: Time.now.iso8601 }
                 ))
    end
  end
end

#stop_heartbeatObject (private)

Stop heartbeat timer



382
383
384
385
# File 'lib/a2a/transport/sse.rb', line 382

def stop_heartbeat
  @heartbeat_timer&.kill
  @heartbeat_timer = nil
end