Module: RubyLLM::Streaming
- Included in:
- Provider
- Defined in:
- lib/ruby_llm/streaming.rb
Overview
Handles streaming responses from AI providers.
Defined Under Namespace
Modules: FaradayHandlers
Class Method Summary collapse
- .build_on_data_handler(&handler) ⇒ Object
- .build_stream_error_response(parsed_data, env, status) ⇒ Object
- .error_chunk?(chunk) ⇒ Boolean
- .faraday_1? ⇒ Boolean
- .handle_data(data, env) ⇒ Object
- .handle_error_chunk(chunk, env) ⇒ Object
- .handle_error_event(data, env) ⇒ Object
- .handle_failed_response(chunk, buffer, env) ⇒ Object
- .handle_json_error_chunk(chunk, env) ⇒ Object
- .handle_parsed_error(parsed_data, env) ⇒ Object
- .handle_sse(chunk, parser, env, &block) ⇒ Object
- .handle_stream(&block) ⇒ Object
- .json_error_payload?(chunk) ⇒ Boolean
- .parse_error_from_json(data, env, error_message) ⇒ Object
- .parse_streaming_error(data) ⇒ Object
- .process_stream_chunk(chunk, parser, env) ⇒ Object
- .stream_response(connection, payload, additional_headers = {}, &block) ⇒ Object
Class Method Details
.build_on_data_handler(&handler) ⇒ Object
43 44 45 46 47 48 49 50 51 52 |
# File 'lib/ruby_llm/streaming.rb', line 43 def build_on_data_handler(&handler) buffer = +'' parser = EventStreamParser::Parser.new FaradayHandlers.build( faraday_v1: faraday_1?, on_chunk: ->(chunk, env) { process_stream_chunk(chunk, parser, env, &handler) }, on_failed_response: ->(chunk, env) { handle_failed_response(chunk, buffer, env) } ) end |
.build_stream_error_response(parsed_data, env, status) ⇒ Object
136 137 138 139 140 141 142 143 144 |
# File 'lib/ruby_llm/streaming.rb', line 136 def build_stream_error_response(parsed_data, env, status) error_status = status || env&.status || 500 if faraday_1? Struct.new(:body, :status).new(parsed_data, error_status) else env.merge(body: parsed_data, status: error_status) end end |
.error_chunk?(chunk) ⇒ Boolean
66 67 68 |
# File 'lib/ruby_llm/streaming.rb', line 66 def error_chunk?(chunk) chunk.start_with?('event: error') end |
.faraday_1? ⇒ Boolean
39 40 41 |
# File 'lib/ruby_llm/streaming.rb', line 39 def faraday_1? Faraday::VERSION.start_with?('1') end |
.handle_data(data, env) ⇒ Object
102 103 104 105 106 107 108 109 |
# File 'lib/ruby_llm/streaming.rb', line 102 def handle_data(data, env) parsed = JSON.parse(data) return parsed unless parsed.is_a?(Hash) && parsed.key?('error') handle_parsed_error(parsed, env) rescue JSON::ParserError => e RubyLLM.logger.debug "Failed to parse data chunk: #{e.}" end |
.handle_error_chunk(chunk, env) ⇒ Object
78 79 80 81 |
# File 'lib/ruby_llm/streaming.rb', line 78 def handle_error_chunk(chunk, env) error_data = chunk.split("\n")[1].delete_prefix('data: ') parse_error_from_json(error_data, env, 'Failed to parse error chunk') end |
.handle_error_event(data, env) ⇒ Object
111 112 113 |
# File 'lib/ruby_llm/streaming.rb', line 111 def handle_error_event(data, env) parse_error_from_json(data, env, 'Failed to parse error event') end |
.handle_failed_response(chunk, buffer, env) ⇒ Object
83 84 85 86 87 88 89 |
# File 'lib/ruby_llm/streaming.rb', line 83 def handle_failed_response(chunk, buffer, env) buffer << chunk error_data = JSON.parse(buffer) handle_parsed_error(error_data, env) rescue JSON::ParserError RubyLLM.logger.debug "Accumulating error chunk: #{chunk}" end |
.handle_json_error_chunk(chunk, env) ⇒ Object
74 75 76 |
# File 'lib/ruby_llm/streaming.rb', line 74 def handle_json_error_chunk(chunk, env) parse_error_from_json(chunk, env, 'Failed to parse JSON error chunk') end |
.handle_parsed_error(parsed_data, env) ⇒ Object
123 124 125 126 127 |
# File 'lib/ruby_llm/streaming.rb', line 123 def handle_parsed_error(parsed_data, env) status, = parse_streaming_error(parsed_data.to_json) error_response = build_stream_error_response(parsed_data, env, status) ErrorMiddleware.parse_error(provider: self, response: error_response) end |
.handle_sse(chunk, parser, env, &block) ⇒ Object
91 92 93 94 95 96 97 98 99 100 |
# File 'lib/ruby_llm/streaming.rb', line 91 def handle_sse(chunk, parser, env, &block) parser.feed(chunk) do |type, data| case type.to_sym when :error handle_error_event(data, env) else yield handle_data(data, env, &block) unless data == '[DONE]' end end end |
.handle_stream(&block) ⇒ Object
31 32 33 34 35 |
# File 'lib/ruby_llm/streaming.rb', line 31 def handle_stream(&block) build_on_data_handler do |data| block.call(build_chunk(data)) if data end end |
.json_error_payload?(chunk) ⇒ Boolean
70 71 72 |
# File 'lib/ruby_llm/streaming.rb', line 70 def json_error_payload?(chunk) chunk.lstrip.start_with?('{') && chunk.include?('"error"') end |
.parse_error_from_json(data, env, error_message) ⇒ Object
129 130 131 132 133 134 |
# File 'lib/ruby_llm/streaming.rb', line 129 def parse_error_from_json(data, env, ) parsed_data = JSON.parse(data) handle_parsed_error(parsed_data, env) rescue JSON::ParserError => e RubyLLM.logger.debug "#{}: #{e.}" end |
.parse_streaming_error(data) ⇒ Object
115 116 117 118 119 120 121 |
# File 'lib/ruby_llm/streaming.rb', line 115 def parse_streaming_error(data) error_data = JSON.parse(data) [500, error_data['message'] || 'Unknown streaming error'] rescue JSON::ParserError => e RubyLLM.logger.debug "Failed to parse streaming error: #{e.}" [500, "Failed to parse error: #{data}"] end |
.process_stream_chunk(chunk, parser, env) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/ruby_llm/streaming.rb', line 54 def process_stream_chunk(chunk, parser, env, &) RubyLLM.logger.debug "Received chunk: #{chunk}" if RubyLLM.config.log_stream_debug if error_chunk?(chunk) handle_error_chunk(chunk, env) elsif json_error_payload?(chunk) handle_json_error_chunk(chunk, env) else yield handle_sse(chunk, parser, env, &) end end |
.stream_response(connection, payload, additional_headers = {}, &block) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/ruby_llm/streaming.rb', line 8 def stream_response(connection, payload, additional_headers = {}, &block) accumulator = StreamAccumulator.new response = connection.post stream_url, payload do |req| req.headers = additional_headers.merge(req.headers) unless additional_headers.empty? if faraday_1? req.[:on_data] = handle_stream do |chunk| accumulator.add chunk block.call chunk end else req..on_data = handle_stream do |chunk| accumulator.add chunk block.call chunk end end end = accumulator.(response) RubyLLM.logger.debug "Stream completed: #{.content}" end |