Module: RubyLLM::Streaming

Included in:
Provider
Defined in:
lib/ruby_llm/streaming.rb

Overview

Handles streaming responses from AI providers.

Defined Under Namespace

Modules: FaradayHandlers

Class Method Summary collapse

Class Method Details

.build_on_data_handler(&handler) ⇒ Object



43
44
45
46
47
48
49
50
51
52
# File 'lib/ruby_llm/streaming.rb', line 43

def build_on_data_handler(&handler)
  buffer = +''
  parser = EventStreamParser::Parser.new

  FaradayHandlers.build(
    faraday_v1: faraday_1?,
    on_chunk: ->(chunk, env) { process_stream_chunk(chunk, parser, env, &handler) },
    on_failed_response: ->(chunk, env) { handle_failed_response(chunk, buffer, env) }
  )
end

.build_stream_error_response(parsed_data, env, status) ⇒ Object



136
137
138
139
140
141
142
143
144
# File 'lib/ruby_llm/streaming.rb', line 136

def build_stream_error_response(parsed_data, env, status)
  error_status = status || env&.status || 500

  if faraday_1?
    Struct.new(:body, :status).new(parsed_data, error_status)
  else
    env.merge(body: parsed_data, status: error_status)
  end
end

.error_chunk?(chunk) ⇒ Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/ruby_llm/streaming.rb', line 66

def error_chunk?(chunk)
  chunk.start_with?('event: error')
end

.faraday_1?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/ruby_llm/streaming.rb', line 39

def faraday_1?
  Faraday::VERSION.start_with?('1')
end

.handle_data(data, env) ⇒ Object



102
103
104
105
106
107
108
109
# File 'lib/ruby_llm/streaming.rb', line 102

def handle_data(data, env)
  parsed = JSON.parse(data)
  return parsed unless parsed.is_a?(Hash) && parsed.key?('error')

  handle_parsed_error(parsed, env)
rescue JSON::ParserError => e
  RubyLLM.logger.debug "Failed to parse data chunk: #{e.message}"
end

.handle_error_chunk(chunk, env) ⇒ Object



78
79
80
81
# File 'lib/ruby_llm/streaming.rb', line 78

def handle_error_chunk(chunk, env)
  error_data = chunk.split("\n")[1].delete_prefix('data: ')
  parse_error_from_json(error_data, env, 'Failed to parse error chunk')
end

.handle_error_event(data, env) ⇒ Object



111
112
113
# File 'lib/ruby_llm/streaming.rb', line 111

def handle_error_event(data, env)
  parse_error_from_json(data, env, 'Failed to parse error event')
end

.handle_failed_response(chunk, buffer, env) ⇒ Object



83
84
85
86
87
88
89
# File 'lib/ruby_llm/streaming.rb', line 83

def handle_failed_response(chunk, buffer, env)
  buffer << chunk
  error_data = JSON.parse(buffer)
  handle_parsed_error(error_data, env)
rescue JSON::ParserError
  RubyLLM.logger.debug "Accumulating error chunk: #{chunk}"
end

.handle_json_error_chunk(chunk, env) ⇒ Object



74
75
76
# File 'lib/ruby_llm/streaming.rb', line 74

def handle_json_error_chunk(chunk, env)
  parse_error_from_json(chunk, env, 'Failed to parse JSON error chunk')
end

.handle_parsed_error(parsed_data, env) ⇒ Object



123
124
125
126
127
# File 'lib/ruby_llm/streaming.rb', line 123

def handle_parsed_error(parsed_data, env)
  status, _message = parse_streaming_error(parsed_data.to_json)
  error_response = build_stream_error_response(parsed_data, env, status)
  ErrorMiddleware.parse_error(provider: self, response: error_response)
end

.handle_sse(chunk, parser, env, &block) ⇒ Object



91
92
93
94
95
96
97
98
99
100
# File 'lib/ruby_llm/streaming.rb', line 91

def handle_sse(chunk, parser, env, &block)
  parser.feed(chunk) do |type, data|
    case type.to_sym
    when :error
      handle_error_event(data, env)
    else
      yield handle_data(data, env, &block) unless data == '[DONE]'
    end
  end
end

.handle_stream(&block) ⇒ Object



31
32
33
34
35
# File 'lib/ruby_llm/streaming.rb', line 31

def handle_stream(&block)
  build_on_data_handler do |data|
    block.call(build_chunk(data)) if data
  end
end

.json_error_payload?(chunk) ⇒ Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/ruby_llm/streaming.rb', line 70

def json_error_payload?(chunk)
  chunk.lstrip.start_with?('{') && chunk.include?('"error"')
end

.parse_error_from_json(data, env, error_message) ⇒ Object



129
130
131
132
133
134
# File 'lib/ruby_llm/streaming.rb', line 129

def parse_error_from_json(data, env, error_message)
  parsed_data = JSON.parse(data)
  handle_parsed_error(parsed_data, env)
rescue JSON::ParserError => e
  RubyLLM.logger.debug "#{error_message}: #{e.message}"
end

.parse_streaming_error(data) ⇒ Object



115
116
117
118
119
120
121
# File 'lib/ruby_llm/streaming.rb', line 115

def parse_streaming_error(data)
  error_data = JSON.parse(data)
  [500, error_data['message'] || 'Unknown streaming error']
rescue JSON::ParserError => e
  RubyLLM.logger.debug "Failed to parse streaming error: #{e.message}"
  [500, "Failed to parse error: #{data}"]
end

.process_stream_chunk(chunk, parser, env) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/ruby_llm/streaming.rb', line 54

def process_stream_chunk(chunk, parser, env, &)
  RubyLLM.logger.debug "Received chunk: #{chunk}" if RubyLLM.config.log_stream_debug

  if error_chunk?(chunk)
    handle_error_chunk(chunk, env)
  elsif json_error_payload?(chunk)
    handle_json_error_chunk(chunk, env)
  else
    yield handle_sse(chunk, parser, env, &)
  end
end

.stream_response(connection, payload, additional_headers = {}, &block) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/ruby_llm/streaming.rb', line 8

def stream_response(connection, payload, additional_headers = {}, &block)
  accumulator = StreamAccumulator.new

  response = connection.post stream_url, payload do |req|
    req.headers = additional_headers.merge(req.headers) unless additional_headers.empty?
    if faraday_1?
      req.options[:on_data] = handle_stream do |chunk|
        accumulator.add chunk
        block.call chunk
      end
    else
      req.options.on_data = handle_stream do |chunk|
        accumulator.add chunk
        block.call chunk
      end
    end
  end

  message = accumulator.to_message(response)
  RubyLLM.logger.debug "Stream completed: #{message.content}"
  message
end