Class: MCP::Server::Transports::StreamableHTTPTransport
- Defined in:
- lib/mcp/server/transports/streamable_http_transport.rb
Constant Summary collapse
- SSE_HEADERS =
{ "Content-Type" => "text/event-stream", "Cache-Control" => "no-cache", "Connection" => "keep-alive", }.freeze
- REQUIRED_POST_ACCEPT_TYPES =
["application/json", "text/event-stream"].freeze
- REQUIRED_GET_ACCEPT_TYPES =
["text/event-stream"].freeze
- STREAM_WRITE_ERRORS =
[IOError, Errno::EPIPE, Errno::ECONNRESET].freeze
- SESSION_REAP_INTERVAL =
60
Instance Method Summary collapse
- #close ⇒ Object
- #handle_request(request) ⇒ Object
-
#initialize(server, stateless: false, session_idle_timeout: nil) ⇒ StreamableHTTPTransport
constructor
A new instance of StreamableHTTPTransport.
- #send_notification(method, params = nil, session_id: nil, related_request_id: nil) ⇒ Object
-
#send_request(method, params = nil, session_id: nil, related_request_id: nil) ⇒ Object
Sends a server-to-client JSON-RPC request (e.g.,
sampling/createMessage) and blocks until the client responds.
Methods inherited from Transport
#handle_json_request, #open, #send_response
Constructor Details
#initialize(server, stateless: false, session_idle_timeout: nil) ⇒ StreamableHTTPTransport
Returns a new instance of StreamableHTTPTransport.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/mcp/server/transports/streamable_http_transport.rb', line 16 def initialize(server, stateless: false, session_idle_timeout: nil) super(server) # Maps `session_id` to `{ get_sse_stream: stream_object, server_session: ServerSession, last_active_at: float_from_monotonic_clock }`. @sessions = {} @mutex = Mutex.new @stateless = stateless @session_idle_timeout = session_idle_timeout @pending_responses = {} if @session_idle_timeout if @stateless raise ArgumentError, "session_idle_timeout is not supported in stateless mode." elsif @session_idle_timeout <= 0 raise ArgumentError, "session_idle_timeout must be a positive number." end end start_reaper_thread if @session_idle_timeout end |
Instance Method Details
#close ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/mcp/server/transports/streamable_http_transport.rb', line 55 def close @reaper_thread&.kill @reaper_thread = nil removed_sessions = @mutex.synchronize do @sessions.each_key.filter_map { |session_id| cleanup_session_unsafe(session_id) } end removed_sessions.each do |session| close_stream_safely(session[:get_sse_stream]) close_post_request_streams(session) end end |
#handle_request(request) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/mcp/server/transports/streamable_http_transport.rb', line 42 def handle_request(request) case request.env["REQUEST_METHOD"] when "POST" handle_post(request) when "GET" handle_get(request) when "DELETE" handle_delete(request) else method_not_allowed_response end end |
#send_notification(method, params = nil, session_id: nil, related_request_id: nil) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/mcp/server/transports/streamable_http_transport.rb', line 69 def send_notification(method, params = nil, session_id: nil, related_request_id: nil) # Stateless mode doesn't support notifications raise "Stateless mode does not support notifications" if @stateless notification = { jsonrpc: "2.0", method: method, } notification[:params] = params if params streams_to_close = [] result = @mutex.synchronize do if session_id # Send to specific session if (session = @sessions[session_id]) stream = active_stream(session, related_request_id: ) end next false unless stream if session_expired?(session) cleanup_and_collect_stream(session_id, streams_to_close) next false end begin send_to_stream(stream, notification) true rescue *STREAM_WRITE_ERRORS => e MCP.configuration.exception_reporter.call( e, { session_id: session_id, error: "Failed to send notification" }, ) if && session[:post_request_streams]&.key?() session[:post_request_streams].delete() streams_to_close << stream else cleanup_and_collect_stream(session_id, streams_to_close) end false end else # Broadcast to all connected SSE sessions sent_count = 0 failed_sessions = [] @sessions.each do |sid, session| next unless (stream = session[:get_sse_stream]) if session_expired?(session) failed_sessions << sid next end begin send_to_stream(stream, notification) sent_count += 1 rescue *STREAM_WRITE_ERRORS => e MCP.configuration.exception_reporter.call( e, { session_id: sid, error: "Failed to send notification" }, ) failed_sessions << sid end end # Clean up failed sessions failed_sessions.each { |sid| cleanup_and_collect_stream(sid, streams_to_close) } sent_count end end streams_to_close.each do |stream| close_stream_safely(stream) end result end |
#send_request(method, params = nil, session_id: nil, related_request_id: nil) ⇒ Object
Sends a server-to-client JSON-RPC request (e.g., sampling/createMessage) and
blocks until the client responds.
Uses a Queue for cross-thread synchronization. This method creates a Queue,
sends the request via SSE stream, then blocks on queue.pop.
When the client POSTs a response, handle_response matches it by request_id
and pushes the result onto the queue, unblocking this thread.
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/mcp/server/transports/streamable_http_transport.rb', line 156 def send_request(method, params = nil, session_id: nil, related_request_id: nil) if @stateless raise "Stateless mode does not support server-to-client requests." end unless session_id raise "session_id is required for server-to-client requests." end request_id = generate_request_id queue = Queue.new request = { jsonrpc: "2.0", id: request_id, method: method } request[:params] = params if params sent = false @mutex.synchronize do unless (session = @sessions[session_id]) raise "Session not found: #{session_id}." end @pending_responses[request_id] = { queue: queue, session_id: session_id } if (stream = active_stream(session, related_request_id: )) begin send_to_stream(stream, request) sent = true rescue *STREAM_WRITE_ERRORS if && session[:post_request_streams]&.key?() session[:post_request_streams].delete() close_stream_safely(stream) else cleanup_session_unsafe(session_id) end end end end # TODO: Replace with event store + replay when resumability is implemented. # Resumability is a separate MCP specification feature (SSE event IDs, Last-Event-ID replay, # event store management) independent of sampling. # See: https://modelcontextprotocol.io/specification/latest/basic/transports#resumability-and-redelivery # # The TypeScript and Python SDKs buffer messages and replay on reconnect. # Until then, raise to prevent queue.pop from blocking indefinitely. unless sent raise "No active stream for #{method} request." end response = queue.pop if response.is_a?(Hash) && response.key?(:error) raise StandardError, "Client returned an error for #{method} request (code: #{response[:error][:code]}): #{response[:error][:message]}" end if response == :session_closed raise "SSE session closed while waiting for #{method} response." end response ensure if request_id @mutex.synchronize do @pending_responses.delete(request_id) end end end |