Class: A2A::Transport::SSE
- Inherits:
-
Object
- Object
- A2A::Transport::SSE
- Defined in:
- lib/a2a/transport/sse.rb
Overview
Server-Sent Events (SSE) transport implementation Provides streaming responses with event parsing, connection management, and heartbeat support
Constant Summary collapse
- EVENT_TYPES =
SSE event types
%w[ message error heartbeat task_status_update task_artifact_update connection_established connection_closed ].freeze
- DEFAULT_HEARTBEAT_INTERVAL =
Default configuration values
30- DEFAULT_RECONNECT_DELAY =
3000- DEFAULT_MAX_RECONNECT_ATTEMPTS =
10- DEFAULT_BUFFER_SIZE =
8KB
1024 * 8
- DEFAULT_TIMEOUT =
60
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#connection_state ⇒ Object
readonly
Returns the value of attribute connection_state.
-
#event_buffer ⇒ Object
readonly
Returns the value of attribute event_buffer.
-
#last_event_id ⇒ Object
readonly
Returns the value of attribute last_event_id.
-
#url ⇒ Object
readonly
Returns the value of attribute url.
Instance Method Summary collapse
-
#buffer_event(event) ⇒ Object
private
Buffer event for replay.
-
#buffered_events ⇒ Array<SSEEvent>
Get buffered events.
-
#build_headers(additional_headers = {}) ⇒ Hash
private
Build request headers.
-
#clear_buffer! ⇒ Object
Clear event buffer.
-
#connect(headers: {}) {|event| ... } ⇒ Enumerator
Connect to SSE endpoint and start streaming.
-
#connect_with_callback(headers) {|event| ... } ⇒ Object
private
Connect with callback-based handling.
-
#connect_with_enumerator(headers) ⇒ Enumerator
private
Connect with enumerator-based handling.
-
#connected? ⇒ Symbol
Get connection status.
-
#default_config ⇒ Hash
private
Build default configuration.
-
#disconnect ⇒ Object
Disconnect from SSE endpoint.
-
#emit_event(event) {|event| ... } ⇒ Object
private
Emit event to listeners.
-
#establish_connection(headers) {|event| ... } ⇒ Object
private
Establish SSE connection.
-
#handle_connection_error(error) ⇒ Object
private
Handle connection errors.
-
#initialize(url, config = {}) ⇒ SSE
constructor
Initialize SSE transport.
-
#off(event_type, handler = nil) ⇒ Object
Remove event listener.
-
#on(event_type, &block) ⇒ Object
Add event listener for specific event type.
-
#parse_sse_line(line) ⇒ SSEEvent?
private
Parse SSE line into event.
-
#process_chunk(chunk) {|event| ... } ⇒ Object
private
Process incoming data chunk.
-
#schedule_reconnection ⇒ Object
private
Schedule reconnection attempt.
-
#send_data(data, event_type: "message") ⇒ Boolean
Send data to SSE endpoint (for bidirectional communication).
-
#start_heartbeat ⇒ Object
private
Start heartbeat timer.
-
#stop_heartbeat ⇒ Object
private
Stop heartbeat timer.
Constructor Details
#initialize(url, config = {}) ⇒ SSE
Initialize SSE transport
47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/a2a/transport/sse.rb', line 47 def initialize(url, config = {}) @url = url @config = default_config.merge(config) @connection_state = :disconnected @event_buffer = Concurrent::Array.new @last_event_id = @config[:last_event_id] @reconnect_attempts = 0 @heartbeat_timer = nil @event_listeners = Concurrent::Hash.new { |h, k| h[k] = [] } @mutex = Mutex.new end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
31 32 33 |
# File 'lib/a2a/transport/sse.rb', line 31 def config @config end |
#connection_state ⇒ Object (readonly)
Returns the value of attribute connection_state.
31 32 33 |
# File 'lib/a2a/transport/sse.rb', line 31 def connection_state @connection_state end |
#event_buffer ⇒ Object (readonly)
Returns the value of attribute event_buffer.
31 32 33 |
# File 'lib/a2a/transport/sse.rb', line 31 def event_buffer @event_buffer end |
#last_event_id ⇒ Object (readonly)
Returns the value of attribute last_event_id.
31 32 33 |
# File 'lib/a2a/transport/sse.rb', line 31 def last_event_id @last_event_id end |
#url ⇒ Object (readonly)
Returns the value of attribute url.
31 32 33 |
# File 'lib/a2a/transport/sse.rb', line 31 def url @url end |
Instance Method Details
#buffer_event(event) ⇒ Object (private)
Buffer event for replay
291 292 293 294 295 296 |
# File 'lib/a2a/transport/sse.rb', line 291 def buffer_event(event) @event_buffer << event # Limit buffer size @event_buffer.shift while @event_buffer.size > @config[:buffer_size] end |
#buffered_events ⇒ Array<SSEEvent>
Get buffered events
162 163 164 |
# File 'lib/a2a/transport/sse.rb', line 162 def buffered_events @event_buffer.to_a end |
#build_headers(additional_headers = {}) ⇒ Hash (private)
Build request headers
393 394 395 396 397 398 399 400 401 |
# File 'lib/a2a/transport/sse.rb', line 393 def build_headers(additional_headers = {}) headers = { "Accept" => "text/event-stream", "Cache-Control" => "no-cache" } headers["Last-Event-ID"] = @last_event_id if @last_event_id headers.merge(@config[:headers]).merge(additional_headers) end |
#clear_buffer! ⇒ Object
Clear event buffer
169 170 171 |
# File 'lib/a2a/transport/sse.rb', line 169 def clear_buffer! @event_buffer.clear end |
#connect(headers: {}) {|event| ... } ⇒ Enumerator
Connect to SSE endpoint and start streaming
67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/a2a/transport/sse.rb', line 67 def connect(headers: {}, &block) @mutex.synchronize do return if @connection_state == :connected @connection_state = :connecting @reconnect_attempts = 0 end if block_given? connect_with_callback(headers, &block) else connect_with_enumerator(headers) end end |
#connect_with_callback(headers) {|event| ... } ⇒ Object (private)
Connect with callback-based handling
181 182 183 184 185 186 187 188 189 |
# File 'lib/a2a/transport/sse.rb', line 181 def connect_with_callback(headers) Thread.new do establish_connection(headers) do |event| yield(event) if block_given? end rescue StandardError => e handle_connection_error(e) end end |
#connect_with_enumerator(headers) ⇒ Enumerator (private)
Connect with enumerator-based handling
197 198 199 200 201 202 203 |
# File 'lib/a2a/transport/sse.rb', line 197 def connect_with_enumerator(headers) Enumerator.new do |yielder| establish_connection(headers) do |event| yielder << event end end end |
#connected? ⇒ Symbol
Get connection status
153 154 155 |
# File 'lib/a2a/transport/sse.rb', line 153 def connected? @connection_state == :connected end |
#default_config ⇒ Hash (private)
Build default configuration
408 409 410 411 412 413 414 415 416 417 418 419 |
# File 'lib/a2a/transport/sse.rb', line 408 def default_config { heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL, reconnect_delay: DEFAULT_RECONNECT_DELAY, max_reconnect_attempts: DEFAULT_MAX_RECONNECT_ATTEMPTS, buffer_size: DEFAULT_BUFFER_SIZE, timeout: DEFAULT_TIMEOUT, headers: {}, auto_reconnect: true, last_event_id: nil } end |
#disconnect ⇒ Object
Disconnect from SSE endpoint
85 86 87 88 89 90 91 92 93 |
# File 'lib/a2a/transport/sse.rb', line 85 def disconnect @mutex.synchronize do @connection_state = :disconnected stop_heartbeat @event_buffer.clear end emit_event(SSEEvent.new(type: "connection_closed", data: { reason: "manual_disconnect" })) end |
#emit_event(event) {|event| ... } ⇒ Object (private)
Emit event to listeners
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 |
# File 'lib/a2a/transport/sse.rb', line 304 def emit_event(event) # Call specific event listeners @event_listeners[event.type].each do |listener| listener.call(event) rescue StandardError => e # Log error but don't stop processing logger = if defined?(::Rails) && ::Rails.respond_to?(:logger) && ::Rails.logger ::Rails.logger else require "logger" Logger.new($stdout) end logger.debug { "Error in event listener: #{e.}" } end # Call generic block handler yield(event) if block_given? end |
#establish_connection(headers) {|event| ... } ⇒ Object (private)
Establish SSE connection
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/a2a/transport/sse.rb', line 211 def establish_connection(headers, &block) request_headers = build_headers(headers) # Use HTTP transport for the underlying connection http = A2A::Transport::Http.new(@url, timeout: @config[:timeout]) http.get(headers: request_headers) do |req| req..on_data = proc do |chunk, _size| process_chunk(chunk, &block) end end @connection_state = :connected start_heartbeat emit_event(SSEEvent.new(type: "connection_established", data: { url: @url })) rescue StandardError => e handle_connection_error(e) end |
#handle_connection_error(error) ⇒ Object (private)
Handle connection errors
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/a2a/transport/sse.rb', line 328 def handle_connection_error(error) @connection_state = :disconnected stop_heartbeat error_event = SSEEvent.new( type: "error", data: { error: error., reconnect_attempts: @reconnect_attempts } ) emit_event(error_event) # Attempt reconnection if enabled return unless @config[:auto_reconnect] && @reconnect_attempts < @config[:max_reconnect_attempts] schedule_reconnection end |
#off(event_type, handler = nil) ⇒ Object
Remove event listener
111 112 113 114 115 116 117 |
# File 'lib/a2a/transport/sse.rb', line 111 def off(event_type, handler = nil) if handler @event_listeners[event_type.to_s].delete(handler) else @event_listeners[event_type.to_s].clear end end |
#on(event_type, &block) ⇒ Object
Add event listener for specific event type
101 102 103 |
# File 'lib/a2a/transport/sse.rb', line 101 def on(event_type, &block) @event_listeners[event_type.to_s] << block if block_given? end |
#parse_sse_line(line) ⇒ SSEEvent? (private)
Parse SSE line into event
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/a2a/transport/sse.rb', line 255 def parse_sse_line(line) return nil if line.empty? || line.start_with?(":") if line.start_with?("data: ") data_content = line[6..] begin data = JSON.parse(data_content) SSEEvent.new( type: data["type"] || "message", data: data["data"] || data, id: data["id"], retry_interval: data["retry"] ) rescue JSON::ParserError SSEEvent.new(type: "message", data: data_content) end elsif line.start_with?("event: ") # Store event type for next data line (simplified parsing) nil elsif line.start_with?("id: ") # Store event ID for next data line (simplified parsing) nil elsif line.start_with?("retry: ") # Update reconnection delay @config[:reconnect_delay] = line[7..].to_i nil else nil end end |
#process_chunk(chunk) {|event| ... } ⇒ Object (private)
Process incoming data chunk
236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/a2a/transport/sse.rb', line 236 def process_chunk(chunk, &block) lines = chunk.split("\n") lines.each do |line| event = parse_sse_line(line.strip) next unless event @last_event_id = event.id if event.id buffer_event(event) emit_event(event, &block) end end |
#schedule_reconnection ⇒ Object (private)
Schedule reconnection attempt
350 351 352 353 354 355 356 357 358 |
# File 'lib/a2a/transport/sse.rb', line 350 def schedule_reconnection @reconnect_attempts += 1 @connection_state = :reconnecting Thread.new do sleep(@config[:reconnect_delay] / 1000.0) connect if @connection_state == :reconnecting end end |
#send_data(data, event_type: "message") ⇒ Boolean
Send data to SSE endpoint (for bidirectional communication)
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/a2a/transport/sse.rb', line 126 def send_data(data, event_type: "message") return false unless @connection_state == :connected # This would typically use a separate HTTP connection for sending # as SSE is primarily unidirectional from server to client begin http_transport = A2A::Transport::Http.new(@url.gsub("/events", "")) response = http_transport.post( "/events/send", body: { type: event_type, data: data, last_event_id: @last_event_id } ) response.status == 200 rescue StandardError => e emit_event(SSEEvent.new(type: "error", data: { error: e. })) false end end |
#start_heartbeat ⇒ Object (private)
Start heartbeat timer
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 |
# File 'lib/a2a/transport/sse.rb', line 363 def start_heartbeat return unless @config[:heartbeat_interval].positive? @heartbeat_timer = Thread.new do loop do sleep(@config[:heartbeat_interval]) break unless @connection_state == :connected emit_event(SSEEvent.new( type: "heartbeat", data: { timestamp: Time.now.iso8601 } )) end end end |
#stop_heartbeat ⇒ Object (private)
Stop heartbeat timer
382 383 384 385 |
# File 'lib/a2a/transport/sse.rb', line 382 def stop_heartbeat @heartbeat_timer&.kill @heartbeat_timer = nil end |