Module: RubyLLM::Providers::Bedrock::Streaming::Base

Included in:
RubyLLM::Providers::Bedrock::Streaming
Defined in:
lib/ruby_llm/providers/bedrock/streaming/base.rb

Overview

Base module for AWS Bedrock streaming functionality.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



9
10
11
12
13
14
# File 'lib/ruby_llm/providers/bedrock/streaming/base.rb', line 9

def self.included(base)
  base.include ContentExtraction
  base.include MessageProcessing
  base.include PayloadProcessing
  base.include PreludeHandling
end

Instance Method Details

#handle_stream(&block) ⇒ Object



37
38
39
40
41
42
43
44
45
46
# File 'lib/ruby_llm/providers/bedrock/streaming/base.rb', line 37

def handle_stream(&block)
  buffer = +''
  proc do |chunk, _bytes, env|
    if env && env.status != 200
      handle_failed_response(chunk, buffer, env)
    else
      process_chunk(chunk, &block)
    end
  end
end

#stream_response(connection, payload, additional_headers = {}, &block) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/ruby_llm/providers/bedrock/streaming/base.rb', line 20

def stream_response(connection, payload, additional_headers = {}, &block)
  signature = sign_request("#{connection.connection.url_prefix}#{stream_url}", payload:)
  accumulator = StreamAccumulator.new

  response = connection.post stream_url, payload do |req|
    req.headers.merge! build_headers(signature.headers, streaming: block_given?)
    # Merge additional headers, with existing headers taking precedence
    req.headers = additional_headers.merge(req.headers) unless additional_headers.empty?
    req.options.on_data = handle_stream do |chunk|
      accumulator.add chunk
      block.call chunk
    end
  end

  accumulator.to_message(response)
end

#stream_urlObject



16
17
18
# File 'lib/ruby_llm/providers/bedrock/streaming/base.rb', line 16

def stream_url
  "model/#{@model_id}/invoke-with-response-stream"
end