Class: ClickhouseRuby::StreamingResult

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/clickhouse_ruby/streaming_result.rb

Overview

Memory-efficient streaming result for large queries

StreamingResult provides an Enumerable interface for processing ClickHouse query results without loading all rows into memory. Rows are parsed line-by-line as they arrive from the server.

Features:

  • Enumerable interface for chainable operations

  • Lazy evaluation (no data loaded until iterated)

  • Support for gzip decompression

  • Progress callbacks

  • Batch processing

Examples:

Basic usage

client.stream_execute('SELECT * FROM huge_table').each do |row|
  process(row)
end

Lazy enumeration with filtering

result = client.stream_execute('SELECT * FROM huge_table')
  .lazy
  .select { |row| row['active'] == 1 }
  .take(100)
  .to_a

Batch processing

result.each_batch(size: 1000) do |batch|
  insert_into_cache(batch)
end

Instance Method Summary collapse

Constructor Details

#initialize(connection, sql, format: "JSONEachRow", compression: nil) ⇒ StreamingResult

Creates a new streaming result

Parameters:

  • the ClickHouse connection

  • the SQL query to execute

  • (defaults to: "JSONEachRow")

    response format (default: JSONEachRow)

  • (defaults to: nil)

    compression algorithm (‘gzip’ or nil)



48
49
50
51
52
53
54
# File 'lib/clickhouse_ruby/streaming_result.rb', line 48

def initialize(connection, sql, format: "JSONEachRow", compression: nil)
  @connection = connection
  @sql = sql
  @format = format
  @compression = compression
  @progress_callback = nil
end

Instance Method Details

#each {|Hash| ... } ⇒ Enumerator

Iterates over each row in the result

Returns an Enumerator if no block is given, allowing for lazy evaluation.

Examples:

With block

result.each { |row| puts row['id'] }

Returns enumerator without block

enumerator = result.each

Yields:

  • (Hash)

    each row as a parsed JSON object

Returns:

  • if no block given, otherwise nil



85
86
87
88
89
# File 'lib/clickhouse_ruby/streaming_result.rb', line 85

def each(&block) # rubocop:disable Naming/BlockForwarding
  return enum_for(__method__) unless block_given?

  stream_query(&block) # rubocop:disable Naming/BlockForwarding
end

#each_batch(size: 1000) {|Array<Hash>| ... } ⇒ Enumerator

Iterates over rows in batches

Useful for batch processing (e.g., bulk database operations). The final batch may be smaller than the specified size.

Examples:

result.each_batch(size: 500) do |batch|
  insert_batch(batch)
end

Yields:

  • (Array<Hash>)

    each batch of rows

Parameters:

  • (defaults to: 1000)

    batch size (default: 1000)

Returns:

  • if no block given, otherwise nil



104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/clickhouse_ruby/streaming_result.rb', line 104

def each_batch(size: 1000)
  return enum_for(__method__, size: size) unless block_given?

  batch = []
  each do |row|
    batch << row
    if batch.size >= size
      yield batch
      batch = []
    end
  end
  yield batch if batch.any?
end

#on_progress {|Hash| ... } ⇒ self

Sets a callback for progress updates

ClickHouse sends progress headers during execution: X-ClickHouse-Progress: “read_rows”:“1000”,“read_bytes”:“50000”

Examples:

result.on_progress do |progress|
  puts "Processed #{progress['read_rows']} rows"
end.each { |row| ... }

Yields:

  • (Hash)

    progress data

Returns:

  • for method chaining



68
69
70
71
# File 'lib/clickhouse_ruby/streaming_result.rb', line 68

def on_progress(&block)
  @progress_callback = block
  self
end