Module: RubyLLM::Providers::Bedrock::Streaming::Base
- Included in:
- RubyLLM::Providers::Bedrock::Streaming
- Defined in:
- lib/ruby_llm/providers/bedrock/streaming/base.rb
Overview
Base module for AWS Bedrock streaming functionality.
Class Method Summary collapse
Instance Method Summary collapse
- #handle_stream(&block) ⇒ Object
- #stream_response(connection, payload, additional_headers = {}, &block) ⇒ Object
- #stream_url ⇒ Object
Class Method Details
.included(base) ⇒ Object
9 10 11 12 13 14 |
# File 'lib/ruby_llm/providers/bedrock/streaming/base.rb', line 9 def self.included(base) base.include ContentExtraction base.include MessageProcessing base.include PayloadProcessing base.include PreludeHandling end |
Instance Method Details
#handle_stream(&block) ⇒ Object
37 38 39 40 41 42 43 44 45 46 |
# File 'lib/ruby_llm/providers/bedrock/streaming/base.rb', line 37 def handle_stream(&block) buffer = +'' proc do |chunk, _bytes, env| if env && env.status != 200 handle_failed_response(chunk, buffer, env) else process_chunk(chunk, &block) end end end |
#stream_response(connection, payload, additional_headers = {}, &block) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/ruby_llm/providers/bedrock/streaming/base.rb', line 20 def stream_response(connection, payload, additional_headers = {}, &block) signature = sign_request("#{connection.connection.url_prefix}#{stream_url}", payload:) accumulator = StreamAccumulator.new response = connection.post stream_url, payload do |req| req.headers.merge! build_headers(signature.headers, streaming: block_given?) # Merge additional headers, with existing headers taking precedence req.headers = additional_headers.merge(req.headers) unless additional_headers.empty? req..on_data = handle_stream do |chunk| accumulator.add chunk block.call chunk end end accumulator.(response) end |
#stream_url ⇒ Object
16 17 18 |
# File 'lib/ruby_llm/providers/bedrock/streaming/base.rb', line 16 def stream_url "model/#{@model_id}/invoke-with-response-stream" end |