Class: ClickhouseRuby::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/clickhouse_ruby/client.rb

Overview

Main HTTP client for ClickHouse communication

The Client provides a high-level interface for executing queries and inserting data into ClickHouse. It handles:

  • Connection pooling for performance

  • Automatic format handling (JSONCompact for queries)

  • Proper error handling with rich context

  • Bulk inserts with JSONEachRow format

CRITICAL: This client ALWAYS checks HTTP status codes before parsing response bodies. This prevents the silent error bug found in clickhouse-activerecord (issue #230).

Examples:

Basic usage

config = ClickhouseRuby::Configuration.new
config.host = 'localhost'
client = ClickhouseRuby::Client.new(config)

result = client.execute('SELECT * FROM users LIMIT 10')
result.each { |row| puts row['name'] }

With settings

result = client.execute(
  'SELECT * FROM large_table',
  settings: { max_execution_time: 120 }
)

Bulk insert

client.insert('events', [
  { id: 1, event: 'click', timestamp: Time.now },
  { id: 2, event: 'view', timestamp: Time.now }
])

Constant Summary collapse

DEFAULT_FORMAT =

Default response format for queries

"JSONCompact"
INSERT_FORMAT =

Format for bulk inserts (5x faster than VALUES)

"JSONEachRow"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Client

Creates a new Client

Parameters:

Raises:



61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/clickhouse_ruby/client.rb', line 61

def initialize(config)
  @config = config
  @config.validate!
  @pool = ConnectionPool.new(config)
  @logger = config.logger
  @default_settings = config.default_settings || {}
  @retry_handler = RetryHandler.new(
    max_attempts: config.max_retries,
    initial_backoff: config.initial_backoff,
    max_backoff: config.max_backoff,
    multiplier: config.backoff_multiplier,
    jitter: config.retry_jitter,
  )
end

Instance Attribute Details

#configConfiguration (readonly)

Returns the client configuration.

Returns:



49
50
51
# File 'lib/clickhouse_ruby/client.rb', line 49

def config
  @config
end

#poolConnectionPool (readonly)

Returns the connection pool.

Returns:



52
53
54
# File 'lib/clickhouse_ruby/client.rb', line 52

def pool
  @pool
end

#retry_handlerRetryHandler (readonly)

Returns the retry handler.

Returns:



55
56
57
# File 'lib/clickhouse_ruby/client.rb', line 55

def retry_handler
  @retry_handler
end

Instance Method Details

#closevoid Also known as: disconnect

This method returns an undefined value.

Closes all connections in the pool

Call this when shutting down to clean up resources.



260
261
262
# File 'lib/clickhouse_ruby/client.rb', line 260

def close
  @pool.shutdown
end

#command(sql, settings: {}) ⇒ Boolean

Executes a command (INSERT, CREATE, DROP, etc.) that doesn’t return data

Examples:

client.command('CREATE TABLE test (id UInt64) ENGINE = Memory')
client.command('DROP TABLE test')

Parameters:

  • sql (String)

    the SQL command to execute

  • settings (Hash) (defaults to: {})

    ClickHouse settings

Returns:

  • (Boolean)

    true if successful

Raises:



110
111
112
113
114
# File 'lib/clickhouse_ruby/client.rb', line 110

def command(sql, settings: {})
  params = build_query_params(settings)
  execute_request(sql, params)
  true
end

#each_batch(sql, batch_size: 1000, settings: {}) {|Array<Hash>| ... } ⇒ Enumerator

Convenience method for batch processing

Equivalent to stream_execute(sql).each_batch(size: batch_size, &block)

Examples:

client.each_batch('SELECT * FROM events', batch_size: 500) do |batch|
  insert_into_cache(batch)
end

Parameters:

  • sql (String)

    the SQL query to execute

  • batch_size (Integer) (defaults to: 1000)

    number of rows per batch

  • settings (Hash) (defaults to: {})

    ClickHouse settings

Yields:

  • (Array<Hash>)

    each batch of rows

Returns:

  • (Enumerator)

    if no block given, otherwise nil



333
334
335
# File 'lib/clickhouse_ruby/client.rb', line 333

def each_batch(sql, batch_size: 1000, settings: {}, &block)
  stream_execute(sql, settings: settings).each_batch(size: batch_size, &block)
end

#each_row(sql, settings: {}) {|Hash| ... } ⇒ Enumerator

Convenience method for iterating over rows one at a time

Equivalent to stream_execute(sql).each(&block)

Examples:

client.each_row('SELECT * FROM events') do |row|
  process(row)
end

Parameters:

  • sql (String)

    the SQL query to execute

  • settings (Hash) (defaults to: {})

    ClickHouse settings

Yields:

  • (Hash)

    each row

Returns:

  • (Enumerator)

    if no block given, otherwise nil



315
316
317
# File 'lib/clickhouse_ruby/client.rb', line 315

def each_row(sql, settings: {}, &block)
  stream_execute(sql, settings: settings).each(&block)
end

#execute(sql, settings: {}, format: DEFAULT_FORMAT) ⇒ Result

Executes a SQL query and returns results

Examples:

result = client.execute('SELECT count() FROM users')
puts result.first['count()']

With settings

result = client.execute(
  'SELECT * FROM events',
  settings: { max_rows_to_read: 1_000_000 }
)

Parameters:

  • sql (String)

    the SQL query to execute

  • settings (Hash) (defaults to: {})

    ClickHouse settings for this query

  • format (String) (defaults to: DEFAULT_FORMAT)

    response format (default: JSONCompact)

Returns:

Raises:



94
95
96
97
98
# File 'lib/clickhouse_ruby/client.rb', line 94

def execute(sql, settings: {}, format: DEFAULT_FORMAT)
  @retry_handler.with_retry(idempotent: true) do
    execute_internal(sql, settings: settings, format: format)
  end
end

#explain(sql, type: :plan, settings: {}) ⇒ Array<Hash>

Returns the query execution plan for a SQL query

Uses EXPLAIN to show how ClickHouse will execute the query.

Examples:

Basic explain

client.explain('SELECT * FROM events WHERE date > today()')
# => [{"explain" => "Expression ..."}]

Pipeline explain

client.explain('SELECT count() FROM events', type: :pipeline)

Estimate query cost

client.explain('SELECT * FROM events', type: :estimate)

Parameters:

  • sql (String)

    the SQL query to explain

  • type (Symbol) (defaults to: :plan)

    type of explain (:plan, :pipeline, :estimate, :ast, :syntax)

  • settings (Hash) (defaults to: {})

    ClickHouse settings

Returns:

  • (Array<Hash>)

    the query plan rows



188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/clickhouse_ruby/client.rb', line 188

def explain(sql, type: :plan, settings: {})
  explain_keyword = case type
                    when :plan then "EXPLAIN"
                    when :pipeline then "EXPLAIN PIPELINE"
                    when :estimate then "EXPLAIN ESTIMATE"
                    when :ast then "EXPLAIN AST"
                    when :syntax then "EXPLAIN SYNTAX"
                    else
                      raise ArgumentError, "Unknown explain type: #{type}. Valid types: :plan, :pipeline, :estimate, :ast, :syntax"
                    end

  explain_sql = "#{explain_keyword} #{sql.strip}"
  execute(explain_sql, settings: settings).to_a
end

#health_checkHash

Returns detailed health status of the client and server

Provides comprehensive health information including server status, connection pool state, and server metrics.

Examples:

client.health_check
# => {
#   status: :healthy,
#   server_reachable: true,
#   server_version: "24.1.0",
#   pool: { available: 3, in_use: 2, total: 5 },
#   uptime_seconds: 3600
# }

Returns:

  • (Hash)

    health status



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/clickhouse_ruby/client.rb', line 219

def health_check
  started_at = Instrumentation.monotonic_time
  server_reachable = ping
  version = nil
  server_metrics = {}

  if server_reachable
    begin
      version = server_version

      # Get server uptime and metrics
      metrics_result = execute("        SELECT\n          uptime() AS uptime_seconds,\n          currentDatabase() AS current_database\n      SQL\n      server_metrics = metrics_result.first if metrics_result.any?\n    rescue StandardError\n      # Ignore errors fetching extended info\n    end\n  end\n\n  pool_health = @pool.health_check\n  check_duration_ms = Instrumentation.duration_ms(started_at)\n\n  {\n    status: server_reachable ? :healthy : :unhealthy,\n    server_reachable: server_reachable,\n    server_version: version,\n    current_database: server_metrics[\"current_database\"],\n    server_uptime_seconds: server_metrics[\"uptime_seconds\"],\n    pool: pool_health,\n    check_duration_ms: check_duration_ms.round(2),\n  }\nend\n", settings: { max_execution_time: 5 })

#insert(table, rows, columns: nil, settings: {}, format: :json_each_row) ⇒ Boolean

Inserts multiple rows using bulk insert (JSONEachRow format)

This is significantly faster than INSERT … VALUES for large datasets. The data is sent in JSONEachRow format which ClickHouse can parse efficiently.

Examples:

client.insert('events', [
  { id: 1, name: 'click' },
  { id: 2, name: 'view' }
])

With explicit columns

client.insert('events', [
  { id: 1, name: 'click', extra: 'ignored' },
], columns: ['id', 'name'])

Parameters:

  • table (String)

    the table name

  • rows (Array<Hash>)

    array of row hashes

  • columns (Array<String>, nil) (defaults to: nil)

    column names (inferred from first row if nil)

  • settings (Hash) (defaults to: {})

    ClickHouse settings

  • format (Symbol) (defaults to: :json_each_row)

    insert format (:json_each_row is default and recommended)

Returns:

  • (Boolean)

    true if successful

Raises:

  • (QueryError)

    if insert fails

  • (ArgumentError)

    if rows is empty



141
142
143
144
145
146
147
148
# File 'lib/clickhouse_ruby/client.rb', line 141

def insert(table, rows, columns: nil, settings: {}, format: :json_each_row)
  raise ArgumentError, "rows cannot be empty" if rows.nil? || rows.empty?

  @retry_handler.with_retry(idempotent: false) do |query_id|
    settings_with_id = settings.merge(query_id: query_id)
    insert_internal(table, rows, columns: columns, settings: settings_with_id, format: format)
  end
end

#pingBoolean

Checks if the ClickHouse server is reachable

Returns:

  • (Boolean)

    true if server responds to ping



153
154
155
156
157
158
159
# File 'lib/clickhouse_ruby/client.rb', line 153

def ping
  @pool.with_connection(&:ping)
rescue ClickhouseRuby::ConnectionError, ClickhouseRuby::ConnectionTimeout,
       ClickhouseRuby::PoolTimeout, SystemCallError, SocketError,
       Net::OpenTimeout, Net::ReadTimeout
  false
end

#pool_statsHash

Returns pool statistics

Returns:

  • (Hash)

    pool stats



268
269
270
# File 'lib/clickhouse_ruby/client.rb', line 268

def pool_stats
  @pool.stats
end

#server_versionString

Returns the ClickHouse server version

Returns:

  • (String)

    version string

Raises:



165
166
167
168
# File 'lib/clickhouse_ruby/client.rb', line 165

def server_version
  result = execute("SELECT version() AS version")
  result.first["version"]
end

#stream_execute(sql, settings: {}) ⇒ StreamingResult

Returns a streaming result for memory-efficient processing

Useful for queries that return large result sets. Results are parsed line-by-line as they arrive from the server, keeping memory usage constant.

Examples:

result = client.stream_execute('SELECT * FROM huge_table')
result.each { |row| process(row) }

Lazy enumeration

client.stream_execute('SELECT * FROM huge_table')
  .lazy
  .select { |row| row['active'] == 1 }
  .take(100)
  .to_a

Parameters:

  • sql (String)

    the SQL query to execute

  • settings (Hash) (defaults to: {})

    ClickHouse settings for this query

Returns:



291
292
293
294
295
296
297
298
299
300
# File 'lib/clickhouse_ruby/client.rb', line 291

def stream_execute(sql, settings: {})
  # Create dedicated connection (not from pool)
  connection = Connection.new(**@config.to_connection_options)

  StreamingResult.new(
    connection,
    sql,
    compression: @config.compression,
  )
end