Class: ClickhouseRuby::Client
- Inherits:
-
Object
- Object
- ClickhouseRuby::Client
- Defined in:
- lib/clickhouse_ruby/client.rb
Overview
Main HTTP client for ClickHouse communication
The Client provides a high-level interface for executing queries and inserting data into ClickHouse. It handles:
-
Connection pooling for performance
-
Automatic format handling (JSONCompact for queries)
-
Proper error handling with rich context
-
Bulk inserts with JSONEachRow format
CRITICAL: This client ALWAYS checks HTTP status codes before parsing response bodies. This prevents the silent error bug found in clickhouse-activerecord (issue #230).
Constant Summary collapse
- DEFAULT_FORMAT =
Default response format for queries
"JSONCompact"- INSERT_FORMAT =
Format for bulk inserts (5x faster than VALUES)
"JSONEachRow"
Instance Attribute Summary collapse
-
#config ⇒ Configuration
readonly
The client configuration.
-
#pool ⇒ ConnectionPool
readonly
The connection pool.
-
#retry_handler ⇒ RetryHandler
readonly
The retry handler.
Instance Method Summary collapse
-
#close ⇒ void
(also: #disconnect)
Closes all connections in the pool.
-
#command(sql, settings: {}) ⇒ Boolean
Executes a command (INSERT, CREATE, DROP, etc.) that doesn’t return data.
-
#each_batch(sql, batch_size: 1000, settings: {}) {|Array<Hash>| ... } ⇒ Enumerator
Convenience method for batch processing.
-
#each_row(sql, settings: {}) {|Hash| ... } ⇒ Enumerator
Convenience method for iterating over rows one at a time.
-
#execute(sql, settings: {}, format: DEFAULT_FORMAT) ⇒ Result
Executes a SQL query and returns results.
-
#explain(sql, type: :plan, settings: {}) ⇒ Array<Hash>
Returns the query execution plan for a SQL query.
-
#health_check ⇒ Hash
Returns detailed health status of the client and server.
-
#initialize(config) ⇒ Client
constructor
Creates a new Client.
-
#insert(table, rows, columns: nil, settings: {}, format: :json_each_row) ⇒ Boolean
Inserts multiple rows using bulk insert (JSONEachRow format).
-
#ping ⇒ Boolean
Checks if the ClickHouse server is reachable.
-
#pool_stats ⇒ Hash
Returns pool statistics.
-
#server_version ⇒ String
Returns the ClickHouse server version.
-
#stream_execute(sql, settings: {}) ⇒ StreamingResult
Returns a streaming result for memory-efficient processing.
Constructor Details
#initialize(config) ⇒ Client
Creates a new Client
61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/clickhouse_ruby/client.rb', line 61 def initialize(config) @config = config @config.validate! @pool = ConnectionPool.new(config) @logger = config.logger @default_settings = config.default_settings || {} @retry_handler = RetryHandler.new( max_attempts: config.max_retries, initial_backoff: config.initial_backoff, max_backoff: config.max_backoff, multiplier: config.backoff_multiplier, jitter: config.retry_jitter, ) end |
Instance Attribute Details
#config ⇒ Configuration (readonly)
Returns the client configuration.
49 50 51 |
# File 'lib/clickhouse_ruby/client.rb', line 49 def config @config end |
#pool ⇒ ConnectionPool (readonly)
Returns the connection pool.
52 53 54 |
# File 'lib/clickhouse_ruby/client.rb', line 52 def pool @pool end |
#retry_handler ⇒ RetryHandler (readonly)
Returns the retry handler.
55 56 57 |
# File 'lib/clickhouse_ruby/client.rb', line 55 def retry_handler @retry_handler end |
Instance Method Details
#close ⇒ void Also known as: disconnect
This method returns an undefined value.
Closes all connections in the pool
Call this when shutting down to clean up resources.
260 261 262 |
# File 'lib/clickhouse_ruby/client.rb', line 260 def close @pool.shutdown end |
#command(sql, settings: {}) ⇒ Boolean
Executes a command (INSERT, CREATE, DROP, etc.) that doesn’t return data
110 111 112 113 114 |
# File 'lib/clickhouse_ruby/client.rb', line 110 def command(sql, settings: {}) params = build_query_params(settings) execute_request(sql, params) true end |
#each_batch(sql, batch_size: 1000, settings: {}) {|Array<Hash>| ... } ⇒ Enumerator
Convenience method for batch processing
Equivalent to stream_execute(sql).each_batch(size: batch_size, &block)
333 334 335 |
# File 'lib/clickhouse_ruby/client.rb', line 333 def each_batch(sql, batch_size: 1000, settings: {}, &block) stream_execute(sql, settings: settings).each_batch(size: batch_size, &block) end |
#each_row(sql, settings: {}) {|Hash| ... } ⇒ Enumerator
Convenience method for iterating over rows one at a time
Equivalent to stream_execute(sql).each(&block)
315 316 317 |
# File 'lib/clickhouse_ruby/client.rb', line 315 def each_row(sql, settings: {}, &block) stream_execute(sql, settings: settings).each(&block) end |
#execute(sql, settings: {}, format: DEFAULT_FORMAT) ⇒ Result
Executes a SQL query and returns results
94 95 96 97 98 |
# File 'lib/clickhouse_ruby/client.rb', line 94 def execute(sql, settings: {}, format: DEFAULT_FORMAT) @retry_handler.with_retry(idempotent: true) do execute_internal(sql, settings: settings, format: format) end end |
#explain(sql, type: :plan, settings: {}) ⇒ Array<Hash>
Returns the query execution plan for a SQL query
Uses EXPLAIN to show how ClickHouse will execute the query.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/clickhouse_ruby/client.rb', line 188 def explain(sql, type: :plan, settings: {}) explain_keyword = case type when :plan then "EXPLAIN" when :pipeline then "EXPLAIN PIPELINE" when :estimate then "EXPLAIN ESTIMATE" when :ast then "EXPLAIN AST" when :syntax then "EXPLAIN SYNTAX" else raise ArgumentError, "Unknown explain type: #{type}. Valid types: :plan, :pipeline, :estimate, :ast, :syntax" end explain_sql = "#{explain_keyword} #{sql.strip}" execute(explain_sql, settings: settings).to_a end |
#health_check ⇒ Hash
Returns detailed health status of the client and server
Provides comprehensive health information including server status, connection pool state, and server metrics.
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/clickhouse_ruby/client.rb', line 219 def health_check started_at = Instrumentation.monotonic_time server_reachable = ping version = nil server_metrics = {} if server_reachable begin version = server_version # Get server uptime and metrics metrics_result = execute(" SELECT\n uptime() AS uptime_seconds,\n currentDatabase() AS current_database\n SQL\n server_metrics = metrics_result.first if metrics_result.any?\n rescue StandardError\n # Ignore errors fetching extended info\n end\n end\n\n pool_health = @pool.health_check\n check_duration_ms = Instrumentation.duration_ms(started_at)\n\n {\n status: server_reachable ? :healthy : :unhealthy,\n server_reachable: server_reachable,\n server_version: version,\n current_database: server_metrics[\"current_database\"],\n server_uptime_seconds: server_metrics[\"uptime_seconds\"],\n pool: pool_health,\n check_duration_ms: check_duration_ms.round(2),\n }\nend\n", settings: { max_execution_time: 5 }) |
#insert(table, rows, columns: nil, settings: {}, format: :json_each_row) ⇒ Boolean
Inserts multiple rows using bulk insert (JSONEachRow format)
This is significantly faster than INSERT … VALUES for large datasets. The data is sent in JSONEachRow format which ClickHouse can parse efficiently.
141 142 143 144 145 146 147 148 |
# File 'lib/clickhouse_ruby/client.rb', line 141 def insert(table, rows, columns: nil, settings: {}, format: :json_each_row) raise ArgumentError, "rows cannot be empty" if rows.nil? || rows.empty? @retry_handler.with_retry(idempotent: false) do |query_id| settings_with_id = settings.merge(query_id: query_id) insert_internal(table, rows, columns: columns, settings: settings_with_id, format: format) end end |
#ping ⇒ Boolean
Checks if the ClickHouse server is reachable
153 154 155 156 157 158 159 |
# File 'lib/clickhouse_ruby/client.rb', line 153 def ping @pool.with_connection(&:ping) rescue ClickhouseRuby::ConnectionError, ClickhouseRuby::ConnectionTimeout, ClickhouseRuby::PoolTimeout, SystemCallError, SocketError, Net::OpenTimeout, Net::ReadTimeout false end |
#pool_stats ⇒ Hash
Returns pool statistics
268 269 270 |
# File 'lib/clickhouse_ruby/client.rb', line 268 def pool_stats @pool.stats end |
#server_version ⇒ String
Returns the ClickHouse server version
165 166 167 168 |
# File 'lib/clickhouse_ruby/client.rb', line 165 def server_version result = execute("SELECT version() AS version") result.first["version"] end |
#stream_execute(sql, settings: {}) ⇒ StreamingResult
Returns a streaming result for memory-efficient processing
Useful for queries that return large result sets. Results are parsed line-by-line as they arrive from the server, keeping memory usage constant.
291 292 293 294 295 296 297 298 299 300 |
# File 'lib/clickhouse_ruby/client.rb', line 291 def stream_execute(sql, settings: {}) # Create dedicated connection (not from pool) connection = Connection.new(**@config.) StreamingResult.new( connection, sql, compression: @config.compression, ) end |