Class: OpenC3::QuestDBClient

Inherits:
Object
  • Object
show all
Defined in:
lib/openc3/utilities/questdb_client.rb

Overview

Utility class for QuestDB data encoding and decoding. This provides a common interface for serializing/deserializing COSMOS data types when writing to and reading from QuestDB.

Defined Under Namespace

Classes: QuestDBError

Constant Summary collapse

TIMESTAMP_ITEMS =

Special timestamp items that are calculated from PACKET_TIMESECONDS/RECEIVED_TIMESECONDS columns rather than stored as separate columns. PACKET_TIMESECONDS and RECEIVED_TIMESECONDS are stored as timestamp_ns columns and need conversion to float seconds on read. The TIMEFORMATTED items are derived from these timestamp columns.

{
  'PACKET_TIMEFORMATTED' => { source: 'PACKET_TIMESECONDS', format: :formatted },
  'RECEIVED_TIMEFORMATTED' => { source: 'RECEIVED_TIMESECONDS', format: :formatted }
}.freeze
STORED_TIMESTAMP_ITEMS =

Stored timestamp items that are stored as timestamp_ns columns and need conversion to float seconds on read. Distinguished from calculated items above.

Set.new(['PACKET_TIMESECONDS', 'RECEIVED_TIMESECONDS']).freeze
FLOAT64_POS_INF_SENTINEL =

64-bit double sentinels (for FLOAT 64-bit columns)

1.7976931348623155e308
FLOAT64_NEG_INF_SENTINEL =
-1.7976931348623155e308
FLOAT64_NAN_SENTINEL =
-1.7976931348623153e308
FLOAT32_POS_INF_STORED =

32-bit float sentinels (what we read back after 32-bit storage)

3.4028232635611926e38
FLOAT32_NEG_INF_STORED =
-3.4028232635611926e38
FLOAT32_NAN_STORED =
-3.4028230607370965e38
TIMESTAMP_SELECT =

SQL: nanosecond-precision packet timestamp for explicit SELECT lists. PG wire protocol truncates timestamp_ns to microseconds; CAST AS LONG preserves full precision.

'CAST(PACKET_TIMESECONDS AS LONG) as PACKET_TIMESECONDS'
TIMESTAMP_EXTRAS =

SQL: nanosecond-precision timestamps for SELECT * queries (different aliases avoid column name collision).

'CAST(PACKET_TIMESECONDS AS LONG) as "__pkt_time_ns", CAST(RECEIVED_TIMESECONDS AS LONG) as "__rx_time_ns"'

Class Method Summary collapse

Class Method Details

.add_timestamp_entries!(entry, timestamp_ns, prefix) ⇒ Object

Add TIMESECONDS and TIMEFORMATTED entries to a hash from a nanosecond timestamp. Used when building packet entries from CAST(timestamp AS LONG) columns.

Parameters:

  • entry (Hash)

    Entry hash to populate

  • timestamp_ns (Integer)

    Nanoseconds since epoch

  • prefix (String)

    ‘PACKET’ or ‘RECEIVED’



512
513
514
515
516
517
# File 'lib/openc3/utilities/questdb_client.rb', line 512

def self.add_timestamp_entries!(entry, timestamp_ns, prefix)
  return unless timestamp_ns
  utc_time = nsec_to_utc_time(timestamp_ns)
  entry["#{prefix}_TIMESECONDS"] = format_timestamp(utc_time, :seconds)
  entry["#{prefix}_TIMEFORMATTED"] = format_timestamp(utc_time, :formatted)
end

.build_aggregation_selects(safe_item_name, value_type, item_name: nil) ⇒ Array<String>, Hash

Build aggregation SELECT columns (min/max/avg/stddev) for a single item. Returns the SELECT fragments and a column_mapping hash.

Parameters:

  • safe_item_name (String)

    Sanitized column name

  • value_type (Symbol)

    :RAW or :CONVERTED

  • item_name (String, nil) (defaults to: nil)

    Original (unsanitized) item name for mapping values. Defaults to safe_item_name if not provided.

Returns:

  • (Array<String>, Hash)

    Two-element array: [select_fragments, column_mapping] column_mapping maps result column alias to [item_name, reduced_type, value_type]



453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
# File 'lib/openc3/utilities/questdb_client.rb', line 453

def self.build_aggregation_selects(safe_item_name, value_type, item_name: nil)
  item_name ||= safe_item_name
  selects = []
  mapping = {}
  case value_type
  when :RAW
    col = safe_item_name
    { 'N' => :MIN, 'X' => :MAX, 'A' => :AVG, 'S' => :STDDEV }.each do |suffix, reduced_type|
      alias_name = "#{safe_item_name}__#{suffix}"
      selects << "#{reduced_type.to_s.downcase}(\"#{col}\") as \"#{alias_name}\""
      mapping[alias_name] = [item_name, reduced_type, :RAW]
    end
  when :CONVERTED
    col = "#{safe_item_name}__C"
    { 'CN' => :MIN, 'CX' => :MAX, 'CA' => :AVG, 'CS' => :STDDEV }.each do |suffix, reduced_type|
      alias_name = "#{safe_item_name}__#{suffix}"
      selects << "#{reduced_type.to_s.downcase}(\"#{col}\") as \"#{alias_name}\""
      mapping[alias_name] = [item_name, reduced_type, :CONVERTED]
    end
  else
    # No aggregation for FORMATTED type since it is a string
    raise QuestDBError.new("Unsupported value type for aggregation: #{value_type}")
  end
  [selects, mapping]
end

.build_item_columns_query(table_name, column_names, start_time, end_time, include_received_ts: false) ⇒ String

Build a SELECT query for specific item columns from a single table.

Parameters:

  • table_name (String)

    Sanitized QuestDB table name

  • column_names (Array<String>)

    Quoted column expressions (e.g., ‘“TEMP1__C”’)

  • start_time (Integer)

    Start timestamp in nanoseconds

  • end_time (Integer, nil)

    End timestamp in nanoseconds

  • include_received_ts (Boolean) (defaults to: false)

    Whether to include RECEIVED_TIMESECONDS

Returns:

  • (String)

    Complete SQL query (without LIMIT clause)



587
588
589
590
591
592
593
594
595
# File 'lib/openc3/utilities/questdb_client.rb', line 587

def self.build_item_columns_query(table_name, column_names, start_time, end_time, include_received_ts: false)
  names = column_names.dup
  names << TIMESTAMP_SELECT
  names << "RECEIVED_TIMESECONDS" if include_received_ts
  names << "COSMOS_EXTRA"
  query = "SELECT #{names.join(', ')} FROM #{table_name}"
  query += time_where_clause(start_time, end_time)
  query
end

.build_item_defs_map(packet_def) ⇒ Hash

Build a hash mapping sanitized column names to item definitions. Used for type-aware decoding of QuestDB SELECT * results.

Parameters:

  • packet_def (Hash, nil)

    Packet definition from TargetModel.packet

Returns:

  • (Hash)

    { sanitized_column_name => item_def_hash }



435
436
437
438
439
440
441
442
# File 'lib/openc3/utilities/questdb_client.rb', line 435

def self.build_item_defs_map(packet_def)
  map = {}
  return map unless packet_def
  packet_def['items']&.each do |item|
    map[sanitize_column_name(item['name'])] = item
  end
  map
end

.build_packet_query(table_name, start_time, end_time) ⇒ String

Build a SELECT * query for full packet data from a single table.

Parameters:

  • table_name (String)

    Sanitized QuestDB table name

  • start_time (Integer)

    Start timestamp in nanoseconds

  • end_time (Integer, nil)

    End timestamp in nanoseconds

Returns:

  • (String)

    Complete SQL query (without LIMIT clause)



603
604
605
606
607
# File 'lib/openc3/utilities/questdb_client.rb', line 603

def self.build_packet_query(table_name, start_time, end_time)
  query = "SELECT *, #{TIMESTAMP_EXTRAS} FROM \"#{table_name}\""
  query += time_where_clause(start_time, end_time)
  query
end

.build_packet_reduced_selects(packet_def, value_type) ⇒ Array<String>, Boolean

Build aggregation SELECT columns for all numeric items in a packet definition. Filters out STRING, BLOCK, and DERIVED items since they can’t be aggregated.

Parameters:

  • packet_def (Hash, nil)

    Packet definition from TargetModel.packet

  • value_type (Symbol)

    :RAW or :CONVERTED

Returns:

  • (Array<String>, Boolean)

    Two-element array: [select_fragments, has_numeric_items] select_fragments includes TIMESTAMP_SELECT as the first element.



486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
# File 'lib/openc3/utilities/questdb_client.rb', line 486

def self.build_packet_reduced_selects(packet_def, value_type)
  selects = [TIMESTAMP_SELECT]
  has_items = false
  return [selects, false] unless packet_def && packet_def['items']

  packet_def['items'].each do |item|
    data_type = item['data_type']
    next if data_type.nil?
    next if ['STRING', 'BLOCK', 'DERIVED'].include?(data_type)
    next unless value_type == :RAW || value_type == :CONVERTED

    safe_name = sanitize_column_name(item['name'])
    agg_selects, _mapping = build_aggregation_selects(safe_name, value_type)
    selects.concat(agg_selects)
    has_items = true
  end

  [selects, has_items]
end

.build_reduced_query(table_name, select_columns, start_time, end_time, sample_interval) ⇒ String

Build a SAMPLE BY aggregation query for reduced data.

Parameters:

  • table_name (String)

    Sanitized QuestDB table name

  • select_columns (Array<String>)

    SELECT column expressions including aggregations

  • start_time (Integer)

    Start timestamp in nanoseconds

  • end_time (Integer, nil)

    End timestamp in nanoseconds

  • sample_interval (String)

    QuestDB SAMPLE BY interval (‘1m’, ‘1h’, ‘1d’)

Returns:

  • (String)

    Complete SQL query (without LIMIT clause)



617
618
619
620
621
622
623
624
# File 'lib/openc3/utilities/questdb_client.rb', line 617

def self.build_reduced_query(table_name, select_columns, start_time, end_time, sample_interval)
  query = "SELECT #{select_columns.join(', ')} FROM \"#{table_name}\""
  query += time_where_clause(start_time, end_time)
  query += " SAMPLE BY #{sample_interval}"
  query += " ALIGN TO CALENDAR"
  query += " ORDER BY PACKET_TIMESECONDS"
  query
end

.check_connectionObject

Health check - attempt to connect and immediately close. Returns true if successful, raises on failure.



64
65
66
67
68
69
70
71
72
73
74
# File 'lib/openc3/utilities/questdb_client.rb', line 64

def self.check_connection
  conn = PG::Connection.new(
    host: ENV['OPENC3_TSDB_HOSTNAME'],
    port: ENV['OPENC3_TSDB_QUERY_PORT'],
    user: ENV['OPENC3_TSDB_USERNAME'],
    password: ENV['OPENC3_TSDB_PASSWORD'],
    dbname: 'qdb'
  )
  conn.close
  true
end

.coerce_to_utc(value) ⇒ Time?

Coerce a value from QuestDB (which may be a Time, Float, Integer, String, or PG timestamp object) into a Ruby UTC Time.

Parameters:

  • value (Object)

    Timestamp value in any supported format

Returns:

  • (Time, nil)

    UTC Time object or nil



329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/openc3/utilities/questdb_client.rb', line 329

def self.coerce_to_utc(value)
  return nil unless value
  case value
  when Time
    # PG driver returns Time objects with UTC values but in local timezone,
    # so reconstruct as UTC from components rather than converting
    pg_timestamp_to_utc(value)
  when Float
    Time.at(value).utc
  when Integer
    nsec_to_utc_time(value).utc
  when String
    Time.parse(value).utc
  else
    raise QuestDBError.new("Unsupported timestamp value #{value} with type: #{value.class}")
  end
end

.column_suffix_for_value_type(value_type) ⇒ String

Return the QuestDB column suffix for a given value type.

Parameters:

  • value_type (String)

    One of ‘RAW’, ‘CONVERTED’, ‘FORMATTED’

Returns:

  • (String)

    Column suffix (e.g., ‘__C’, ‘__F’, or ”)



380
381
382
383
384
385
386
387
388
389
# File 'lib/openc3/utilities/questdb_client.rb', line 380

def self.column_suffix_for_value_type(value_type)
  case value_type
  when 'FORMATTED', 'WITH_UNITS' # WITH_UNITS is deprecated
    '__F'
  when 'CONVERTED'
    '__C'
  else
    ''
  end
end

.connectionObject

Get or create a thread-local PG connection with type mapping configured. Returns the thread-local connection - callers should not close it.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/openc3/utilities/questdb_client.rb', line 37

def self.connection
  conn = @thread_conn.value
  if conn.nil? || conn.finished?
    conn = PG::Connection.new(
      host: ENV['OPENC3_TSDB_HOSTNAME'],
      port: ENV['OPENC3_TSDB_QUERY_PORT'],
      user: ENV['OPENC3_TSDB_USERNAME'],
      password: ENV['OPENC3_TSDB_PASSWORD'],
      dbname: 'qdb'
    )
    conn.type_map_for_results = PG::BasicTypeMapForResults.new(conn)
    @thread_conn.value = conn
  end
  conn
end

.decode_float_special_values(value) ⇒ Float

Decode sentinel values back to float special values (inf, -inf, nan). Checks against both 32-bit and 64-bit sentinel values since we may not know the original column type at read time.

Parameters:

  • value (Float)

    The float value to potentially decode

Returns:

  • (Float)

    The value with sentinels replaced by special values



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/openc3/utilities/questdb_client.rb', line 108

def self.decode_float_special_values(value)
  return value unless value.is_a?(Float)

  # Check 64-bit sentinels
  return Float::INFINITY if value == FLOAT64_POS_INF_SENTINEL
  return -Float::INFINITY if value == FLOAT64_NEG_INF_SENTINEL
  return Float::NAN if value == FLOAT64_NAN_SENTINEL

  # Check 32-bit sentinels (stored values after precision loss)
  return Float::INFINITY if value == FLOAT32_POS_INF_STORED
  return -Float::INFINITY if value == FLOAT32_NEG_INF_STORED
  return Float::NAN if value == FLOAT32_NAN_STORED

  value
end

.decode_item_row(row, sql_to_local, meta) ⇒ Hash

Decode a single row from a per-table item columns query into an entry hash. Handles stored timestamps, calculated timestamps, and regular value decoding.

Parameters:

  • row (PG::Result row)

    Single row (iterable as [col_name, value] pairs)

  • sql_to_local (Array<Integer>)

    Mapping from SQL column index to meta position

  • meta (Hash)

    Per-table metadata with keys: :item_keys [Array<String>] - ordered list of item key identifiers :item_types [Array<Hash>] - type info per position ({ ‘data_type’ =>, ‘array_size’ => }) :stored_timestamp_item_keys [Hash] - { item_key => { column: col_name } } :calculated_positions [Hash] - { local_idx => { source: col_name, format: :seconds/:formatted } }

Returns:

  • (Hash)

    Entry hash with __type, item_key => value, __time, COSMOS_EXTRA



637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
# File 'lib/openc3/utilities/questdb_client.rb', line 637

def self.decode_item_row(row, sql_to_local, meta)
  num_sql_item_cols = sql_to_local.length

  entry = { "__type" => "ITEMS" }
  timestamp_values = {}
  time_ns = nil
  cosmos_extra = nil

  values = Array.new(meta[:item_keys].length)

  row.each_with_index do |tuple, sql_index|
    col_name = tuple[0]
    value = tuple[1]

    # Fixed columns come after item columns
    if sql_index >= num_sql_item_cols
      case col_name
      when 'PACKET_TIMESECONDS'
        time_ns = value.to_i
        timestamp_values['PACKET_TIMESECONDS'] = nsec_to_utc_time(time_ns)
      when 'RECEIVED_TIMESECONDS'
        timestamp_values['RECEIVED_TIMESECONDS'] = value if value
      when 'COSMOS_EXTRA'
        cosmos_extra = value
      # No else because we're only interested in these specific extra columns; others can be ignored
      end
      next
    end

    local_idx = sql_to_local[sql_index]

    # Track timestamp values from item columns
    if col_name == 'RECEIVED_TIMESECONDS'
      timestamp_values['RECEIVED_TIMESECONDS'] = value
    end

    next if value.nil?

    type_info = meta[:item_types][local_idx] || {}
    if meta[:stored_timestamp_item_keys].key?(meta[:item_keys][local_idx])
      ts_utc = coerce_to_utc(value)
      values[local_idx] = format_timestamp(ts_utc, :seconds) if ts_utc
    else
      values[local_idx] = decode_value(
        value,
        data_type: type_info['data_type'],
        array_size: type_info['array_size']
      )
    end
  end

  # Build ordered entry hash with calculated items in their natural position
  meta[:item_keys].each_with_index do |item_key, local_idx|
    if meta[:calculated_positions].key?(local_idx)
      calc_info = meta[:calculated_positions][local_idx]
      ts_value = timestamp_values[calc_info[:source]]
      next unless ts_value
      ts_utc = coerce_to_utc(ts_value)
      calculated_value = format_timestamp(ts_utc, calc_info[:format])
      entry[item_key] = calculated_value if calculated_value
    elsif !values[local_idx].nil?
      entry[item_key] = values[local_idx]
    end
  end

  entry['__time'] = time_ns if time_ns
  entry['COSMOS_EXTRA'] = cosmos_extra if cosmos_extra
  entry
end

.decode_packet_row(row, value_type, packet_def) ⇒ Hash

Decode a single row from a SELECT * packet query into an entry hash. Handles nanosecond timestamp CAST columns, value-type column preference, and type-aware decoding.

Parameters:

  • row (PG::Result row)

    Single row as iterable [col_name, value] pairs

  • value_type (Symbol)

    :RAW, :CONVERTED, :FORMATTED

  • packet_def (Hash, nil)

    Packet definition for type-aware decoding

Returns:

  • (Hash)

    Entry hash with item => value, __time, COSMOS_EXTRA, timestamp entries



715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
# File 'lib/openc3/utilities/questdb_client.rb', line 715

def self.decode_packet_row(row, value_type, packet_def)
  entry = {}
  item_defs = build_item_defs_map(packet_def)

  # First pass: build a hash of all columns for value-type preference lookups
  columns = {}
  row.each do |tuple|
    columns[tuple[0]] = tuple[1]
  end

  cosmos_timestamp_ns = nil
  received_timestamp_ns = nil

  # Second pass: process columns based on value_type
  row.each do |tuple|
    column_name = tuple[0]
    raw_value = tuple[1]

    if column_name == '__pkt_time_ns'
      cosmos_timestamp_ns = raw_value.to_i
      entry['__time'] = cosmos_timestamp_ns
      next
    end

    if column_name == '__rx_time_ns'
      received_timestamp_ns = raw_value.to_i
      next
    end

    # Skip PG timestamp versions - handled via CAST AS LONG columns above
    next if column_name == 'PACKET_TIMESECONDS'
    next if column_name == 'RECEIVED_TIMESECONDS'
    next if column_name == 'COSMOS_DATA_TAG'

    if column_name == 'COSMOS_EXTRA'
      entry['COSMOS_EXTRA'] = raw_value
      next
    end

    base_name = column_name.sub(/(__C|__F|__U)$/, '')
    item_def = item_defs[base_name]

    col_value_type = value_type_for_column_suffix(column_name)
    type_info = resolve_item_type(item_def, col_value_type)
    value = decode_value(raw_value, data_type: type_info['data_type'], array_size: type_info['array_size'])

    case value_type
    when :RAW
      next if column_name.end_with?('__C', '__F', '__U')
      entry[column_name] = value
    when :CONVERTED
      if column_name.end_with?('__C')
        entry[column_name.sub(/__C$/, '')] = value
      elsif !column_name.end_with?('__F', '__U') && !columns.key?("#{column_name}__C")
        entry[column_name] = value
      end
    when :FORMATTED
      if column_name.end_with?('__F')
        entry[column_name.sub(/__F$/, '')] = value
      elsif column_name.end_with?('__C') && !columns.key?("#{column_name.sub(/__C$/, '')}__F")
        entry[column_name.sub(/__C$/, '')] = value
      elsif !column_name.end_with?('__C', '__F', '__U') && !columns.key?("#{column_name}__F") && !columns.key?("#{column_name}__C")
        entry[column_name] = value
      end
    else
      raise QuestDBError.new("Unsupported value type for packet decoding: #{value_type}")
    end
  end

  add_timestamp_entries!(entry, cosmos_timestamp_ns, 'PACKET')
  add_timestamp_entries!(entry, received_timestamp_ns, 'RECEIVED')
  entry
end

.decode_reduced_row(row) ⇒ Hash

Decode a single row from a SAMPLE BY aggregation query. All non-timestamp columns are decoded as DOUBLE (aggregation results are always numeric).

Parameters:

  • row (PG::Result row)

    Single row as iterable [col_name, value] pairs

Returns:

  • (Hash)

    { col_name => decoded_value, ‘__time’ => ns_integer }



794
795
796
797
798
799
800
801
802
803
804
805
806
# File 'lib/openc3/utilities/questdb_client.rb', line 794

def self.decode_reduced_row(row)
  entry = {}
  row.each do |tuple|
    col_name = tuple[0]
    value = tuple[1]
    if col_name == 'PACKET_TIMESECONDS'
      entry['__time'] = value.to_i
    else
      entry[col_name] = decode_value(value, data_type: 'DOUBLE', array_size: nil)
    end
  end
  entry
end

.decode_value(value, data_type: nil, array_size: nil) ⇒ Object

Decode a value retrieved from QuestDB back to its original Ruby type.

QuestDB stores certain COSMOS types as encoded strings:

  • Arrays are JSON-encoded: “[1, 2, 3]” or ‘[“a”, “b”]’

  • Objects/Hashes are JSON-encoded: ‘“value”’

  • Binary data (BLOCK) is base64-encoded

  • Large integers (64-bit) are stored as DECIMAL

Parameters:

  • value (Object)

    The value to decode

  • data_type (String) (defaults to: nil)

    COSMOS data type (INT, UINT, FLOAT, STRING, BLOCK, DERIVED, etc.)

  • array_size (Integer, nil) (defaults to: nil)

    If not nil, indicates this is an array item

Returns:

  • (Object)

    The decoded value



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/openc3/utilities/questdb_client.rb', line 136

def self.decode_value(value, data_type: nil, array_size: nil)
  # Handle BigDecimal values from QuestDB DECIMAL columns (used for 64-bit integers)
  if value.is_a?(BigDecimal)
    return value.to_i if data_type == 'INT' || data_type == 'UINT'
    return value
  end

  # Decode float sentinel values back to inf/nan
  return decode_float_special_values(value) if value.is_a?(Float)

  # Non-strings don't need decoding (already handled by PG type mapping)
  return value unless value.is_a?(String)

  # Empty strings stay as empty strings
  return value if value.empty?

  # Handle based on data type if provided
  if data_type == 'BLOCK'
    begin
      return Base64.strict_decode64(value)
    rescue ArgumentError
      return value
    end
  end

  # Arrays are JSON-encoded
  if array_size
    begin
      return JSON.parse(value, allow_nan: true, create_additions: true)
    rescue JSON::ParserError
      return value
    end
  end

  # Integer values stored as strings (fallback path, normally DECIMAL)
  if data_type == 'INT' || data_type == 'UINT'
    begin
      return Integer(value)
    rescue ArgumentError
      return value
    end
  end

  # DERIVED items with declared converted_type are stored as typed columns
  # (float, int, etc.) and will be returned as non-strings, handled above.
  # DERIVED items without declared type or with complex types (ARRAY, OBJECT, ANY)
  # are stored as VARCHAR and JSON-encoded.
  if data_type == 'DERIVED'
    begin
      return JSON.parse(value, allow_nan: true, create_additions: true)
    rescue JSON::ParserError
      # Could be a plain string from DERIVED with converted_type=STRING
      return value
    end
  end

  # No data_type provided - fall back to heuristic decoding
  if data_type.nil?
    first_char = value[0]
    # Try JSON for arrays/objects
    if first_char == '[' || first_char == '{'
      begin
        return JSON.parse(value, allow_nan: true, create_additions: true)
      rescue JSON::ParserError
        # Not valid JSON
      end
    # Try integer conversion for numeric strings
    elsif value =~ /\A-?\d+\z/
      begin
        return Integer(value)
      rescue ArgumentError
        # Not a valid integer
      end
    end
  end

  # Return as-is (STRING type or unknown)
  value
end

.disconnectObject

Reset the connection for the current thread. Used after errors.



54
55
56
57
58
59
60
# File 'lib/openc3/utilities/questdb_client.rb', line 54

def self.disconnect
  conn = @thread_conn.value
  if conn && !conn.finished?
    conn.finish
  end
  @thread_conn.value = nil
end

.fetch_packet_def(target_name, packet_name, type: :TLM, scope: "DEFAULT") ⇒ Hash?

Fetch a packet definition from TargetModel, returning nil if not found.

Parameters:

  • target_name (String)

    Target name

  • packet_name (String)

    Packet name

  • type (Symbol) (defaults to: :TLM)

    :CMD or :TLM (default :TLM)

  • scope (String) (defaults to: "DEFAULT")

    Scope name

Returns:

  • (Hash, nil)

    Packet definition or nil



424
425
426
427
428
# File 'lib/openc3/utilities/questdb_client.rb', line 424

def self.fetch_packet_def(target_name, packet_name, type: :TLM, scope: "DEFAULT")
  TargetModel.packet(target_name, packet_name, type: type, scope: scope)
rescue RuntimeError
  nil
end

.find_item_def(packet_def, item_name) ⇒ Hash?

Find an item definition within a packet definition by name.

Parameters:

  • packet_def (Hash, nil)

    Packet definition from TargetModel.packet

  • item_name (String)

    Item name to find

Returns:

  • (Hash, nil)

    Item definition hash or nil if not found



243
244
245
246
247
248
249
# File 'lib/openc3/utilities/questdb_client.rb', line 243

def self.find_item_def(packet_def, item_name)
  return nil unless packet_def
  packet_def['items']&.each do |item|
    return item if item['name'] == item_name
  end
  nil
end

.format_timestamp(utc_time, format) ⇒ Float, ...

Format a UTC timestamp according to the specified format.

Parameters:

  • utc_time (Time)

    UTC timestamp

  • format (Symbol)

    :seconds for Unix seconds (float), :formatted for ISO 8601

Returns:

  • (Float, String, nil)

    Formatted timestamp or nil if utc_time is nil



364
365
366
367
368
369
370
371
372
373
374
# File 'lib/openc3/utilities/questdb_client.rb', line 364

def self.format_timestamp(utc_time, format)
  return nil unless utc_time
  case format
  when :seconds
    utc_time.to_f
  when :formatted
    utc_time.strftime('%Y-%m-%dT%H:%M:%S.%6NZ')
  else
    nil
  end
end

.nsec_to_utc_time(nsec) ⇒ Time

Convert a nanosecond integer timestamp to a UTC Time object.

Parameters:

  • nsec (Integer)

    Nanoseconds since epoch

Returns:

  • (Time)

    UTC Time object



319
320
321
322
# File 'lib/openc3/utilities/questdb_client.rb', line 319

def self.nsec_to_utc_time(nsec)
  return nil unless nsec
  Time.at(nsec / 1_000_000_000, nsec % 1_000_000_000, :nsec, in: '+00:00')
end

.paginate_query(query, page_size, label:) {|PG::Result| ... } ⇒ Object

Execute a paginated TSDB query, yielding each non-empty PG::Result page. Handles LIMIT pagination and retry on error.

Parameters:

  • query (String)

    Base SQL query (without LIMIT clause)

  • page_size (Integer)

    Number of rows per page

  • label (String)

    Label for log messages

Yields:

  • (PG::Result)

    Each page of results



562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
# File 'lib/openc3/utilities/questdb_client.rb', line 562

def self.paginate_query(query, page_size, label:)
  min = 0
  max = page_size
  loop do
    query_offset = "#{query} LIMIT #{min}, #{max}"
    Logger.debug("QuestDB #{label}: #{query_offset}")
    result = query_with_retry(query_offset, label: label)
    min += page_size
    max += page_size
    if result.nil? or result.ntuples == 0
      return
    else
      yield result
    end
  end
end

.pg_timestamp_to_utc(pg_time) ⇒ Time

Convert a PG timestamp to UTC. PG driver returns timestamps as naive Time objects that need UTC treatment. QuestDB stores timestamps in UTC, but the PG driver applies local timezone.

Parameters:

  • pg_time (Time)

    Timestamp from PG query result

Returns:

  • (Time)

    UTC timestamp



353
354
355
356
357
# File 'lib/openc3/utilities/questdb_client.rb', line 353

def self.pg_timestamp_to_utc(pg_time)
  return nil unless pg_time
  Time.utc(pg_time.year, pg_time.month, pg_time.day,
           pg_time.hour, pg_time.min, pg_time.sec, pg_time.usec)
end

.query_with_retry(query, params: [], max_retries: 5, label: nil) ⇒ PG::Result?

Execute a SQL query with automatic retry on connection errors. Handles PG connection management and retries up to max_retries times.

Parameters:

  • query (String)

    SQL query to execute

  • params (Array) (defaults to: [])

    Query parameters for parameterized queries (uses exec_params)

  • max_retries (Integer) (defaults to: 5)

    Maximum number of retry attempts (default 5)

  • label (String, nil) (defaults to: nil)

    Optional label for log messages

Returns:

  • (PG::Result, nil)

    Query result

Raises:

  • (RuntimeError)

    After exhausting retries



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/openc3/utilities/questdb_client.rb', line 293

def self.query_with_retry(query, params: [], max_retries: 5, label: nil)
  retry_count = 0
  begin
    conn = connection
    if params.empty?
      conn.exec(query)
    else
      conn.exec_params(query, params)
    end
  rescue IOError, PG::Error => e
    retry_count += 1
    if retry_count > (max_retries - 1)
      raise QuestDBError.new("Error querying TSDB#{label ? " (#{label})" : ""}: #{e.message}")
    end
    Logger.warn("TSDB#{label ? " #{label}" : ""}: Retrying due to error: #{e.message}")
    Logger.warn("TSDB#{label ? " #{label}" : ""}: Last query: #{query}")
    disconnect
    sleep 0.1
    retry
  end
end

.resolve_item_type(item_def, value_type) ⇒ Hash

Resolve the data_type and array_size for a QuestDB column based on the item definition and requested value type. This encapsulates the common logic for determining how to decode a value read from QuestDB.

Parameters:

  • item_def (Hash, nil)

    Item definition from packet definition

  • value_type (String)

    One of ‘RAW’, ‘CONVERTED’, ‘FORMATTED’

Returns:

  • (Hash)

    { ‘data_type’ => String|nil, ‘array_size’ => Integer|nil }



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/openc3/utilities/questdb_client.rb', line 258

def self.resolve_item_type(item_def, value_type)
  case value_type
  when 'FORMATTED', 'WITH_UNITS' # WITH_UNITS is deprecated
    { 'data_type' => 'STRING', 'array_size' => nil }
  when 'CONVERTED'
    if item_def
      rc = item_def['read_conversion']
      if rc && rc['converted_type']
        { 'data_type' => rc['converted_type'], 'array_size' => item_def['array_size'] }
      elsif item_def['states']
        { 'data_type' => 'STRING', 'array_size' => nil }
      else
        { 'data_type' => item_def['data_type'], 'array_size' => item_def['array_size'] }
      end
    else
      { 'data_type' => nil, 'array_size' => nil }
    end
  else # RAW or default
    if item_def
      { 'data_type' => item_def['data_type'], 'array_size' => item_def['array_size'] }
    else
      { 'data_type' => nil, 'array_size' => nil }
    end
  end
end

.sample_interval_for(stream_mode) ⇒ String

Returns the SAMPLE BY interval string for a given stream_mode symbol.

Parameters:

  • stream_mode (Symbol)

    :REDUCED_MINUTE, :REDUCED_HOUR, or :REDUCED_DAY

Returns:

  • (String)

    QuestDB SAMPLE BY interval string



530
531
532
533
534
535
536
537
# File 'lib/openc3/utilities/questdb_client.rb', line 530

def self.sample_interval_for(stream_mode)
  case stream_mode
  when :REDUCED_MINUTE then '1m'
  when :REDUCED_HOUR then '1h'
  when :REDUCED_DAY then '1d'
  else '1m'
  end
end

.sanitize_column_name(item_name) ⇒ String

Sanitize a column name for QuestDB. See questdb.com/docs/reference/api/ilp/advanced-settings/#name-restrictions

ILP protocol special characters that must be sanitized in column names

Parameters:

  • item_name (String)

    Item name

Returns:

  • (String)

    Sanitized column name



234
235
236
# File 'lib/openc3/utilities/questdb_client.rb', line 234

def self.sanitize_column_name(item_name)
  item_name.to_s.gsub(/[?\.,'"\\\/:\)\(\+=\-\*\%~;!@#\$\^&]/, '_')
end

.sanitize_table_name(target_name, packet_name, cmd_or_tlm = "TLM", scope: "DEFAULT") ⇒ String

Parameters:

  • target_name (String)

    Target name

  • packet_name (String)

    Packet name

  • cmd_or_tlm (String, Symbol) (defaults to: "TLM")

    “CMD” or “TLM” prefix (default “TLM”)

  • scope (String) (defaults to: "DEFAULT")

    Scope name (default “DEFAULT”)

Returns:

  • (String)

    Sanitized table name



224
225
226
# File 'lib/openc3/utilities/questdb_client.rb', line 224

def self.sanitize_table_name(target_name, packet_name, cmd_or_tlm = "TLM", scope: "DEFAULT")
  "#{scope}__#{cmd_or_tlm}__#{target_name}__#{packet_name}".gsub(/[?,'"\\\/:\)\(\+\*\%~]/, '_')
end

.table_has_data?(table_name, start_time, end_time) ⇒ Boolean

Returns true if the given TSDB table exists and has at least one row in the time range.

Parameters:

  • table_name (String)

    Sanitized table name

  • start_time (Integer)

    Nanosecond start time

  • end_time (Integer, nil)

    Nanosecond end time

Returns:

  • (Boolean)


545
546
547
548
549
550
551
552
553
# File 'lib/openc3/utilities/questdb_client.rb', line 545

def self.table_has_data?(table_name, start_time, end_time)
  query = "SELECT 1 FROM #{table_name}"
  query += time_where_clause(start_time, end_time)
  query += " LIMIT 1"
  result = query_with_retry(query, max_retries: 1, label: "table_has_data")
  result && result.ntuples > 0
rescue RuntimeError
  false
end

.time_where_clause(start_time, end_time, prefix: '') ⇒ String

Build a SQL WHERE clause for PACKET_TIMESECONDS range filtering.

Parameters:

  • start_time (Integer, String)

    Start timestamp (nanoseconds)

  • end_time (Integer, String, nil)

    End timestamp (nanoseconds), or nil for open-ended

  • prefix (String) (defaults to: '')

    Table alias prefix (e.g., ‘T0.’) — default ”

Returns:

  • (String)

    SQL WHERE clause fragment (includes leading space)



411
412
413
414
415
# File 'lib/openc3/utilities/questdb_client.rb', line 411

def self.time_where_clause(start_time, end_time, prefix: '')
  where = " WHERE #{prefix}PACKET_TIMESECONDS >= #{start_time}"
  where += " AND #{prefix}PACKET_TIMESECONDS < #{end_time}" if end_time
  where
end

.tsdb_lookup(items, start_time:, end_time: nil, scope: "DEFAULT") ⇒ Array, Hash

Query historical telemetry data from QuestDB for a list of items. Builds the SQL query, executes it, and decodes all results.

Parameters:

  • items (Array)

    Array of [target_name, packet_name, item_name, value_type, limits] item_name may be nil to indicate a placeholder (non-existent item)

  • start_time (String, Numeric)

    Start timestamp for the query

  • end_time (String, Numeric, nil) (defaults to: nil)

    End timestamp, or nil for “latest single row”

  • scope (String) (defaults to: "DEFAULT")

    Scope name

Returns:

  • (Array, Hash)

    Array of [value, limits_state] pairs per row, or {} if no results. Single-row results return a flat array; multi-row results return array of arrays.



818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
# File 'lib/openc3/utilities/questdb_client.rb', line 818

def self.tsdb_lookup(items, start_time:, end_time: nil, scope: "DEFAULT")
  tables = {}
  names = []
  nil_count = 0
  packet_cache = {}
  item_types = {}
  calculated_items = {}
  needed_timestamps = {}
  current_position = 0

  items.each do |item|
    target_name, packet_name, orig_item_name, value_type, limits = item
    if orig_item_name.nil?
      names << "PACKET_TIMESECONDS as __nil#{nil_count}"
      nil_count += 1
      current_position += 1
      next
    end
    table_name = sanitize_table_name(target_name, packet_name, scope: scope)
    tables[table_name] = 1
    index = tables.find_index {|k,_v| k == table_name }

    if STORED_TIMESTAMP_ITEMS.include?(orig_item_name)
      names << "\"T#{index}.#{orig_item_name}\""
      current_position += 1
      next
    end

    if TIMESTAMP_ITEMS.key?(orig_item_name)
      ts_info = TIMESTAMP_ITEMS[orig_item_name]
      calculated_items[current_position] = {
        source: ts_info[:source],
        format: ts_info[:format],
        table_index: index
      }
      needed_timestamps[index] ||= Set.new
      needed_timestamps[index] << ts_info[:source]
      current_position += 1
      next
    end

    safe_item_name = sanitize_column_name(orig_item_name)

    cache_key = [target_name, packet_name]
    unless packet_cache.key?(cache_key)
      packet_cache[cache_key] = fetch_packet_def(target_name, packet_name, scope: scope)
    end

    packet_def = packet_cache[cache_key]
    item_def = find_item_def(packet_def, orig_item_name)

    suffix = column_suffix_for_value_type(value_type)
    col_name = "T#{index}.#{safe_item_name}#{suffix}"
    names << "\"#{col_name}\""
    item_types[col_name] = resolve_item_type(item_def, value_type)
    current_position += 1
    if limits
      names << "\"T#{index}.#{safe_item_name}__L\""
    end
  end

  # Add needed timestamp columns to the SELECT for calculated items
  needed_timestamps.each do |table_index, ts_columns|
    ts_columns.each do |ts_col|
      names << "T#{table_index}.#{ts_col} as T#{table_index}___ts_#{ts_col}"
    end
  end

  # Build the SQL query
  query = "SELECT #{names.join(", ")} FROM "
  tables.each_with_index do |(table_name, _), index|
    if index == 0
      query += "#{table_name} as T#{index} "
    else
      query += "ASOF JOIN #{table_name} as T#{index} "
    end
  end
  query_params = []
  if start_time && !end_time
    query += "WHERE T0.PACKET_TIMESECONDS < $1 LIMIT -1"
    query_params << start_time
  elsif start_time && end_time
    query += "WHERE T0.PACKET_TIMESECONDS >= $1 AND T0.PACKET_TIMESECONDS < $2"
    query_params << start_time
    query_params << end_time
  end

  result = query_with_retry(query, params: query_params, label: "tsdb_lookup")
  if result.nil? or result.ntuples == 0
    return {}
  end

  data = []
  result.each_with_index do |tuples, row_num|
    data[row_num] ||= []
    row_index = 0
    row_timestamps = {}
    tuples.each do |tuple|
      col_name = tuple[0]
      col_value = tuple[1]
      if col_name.include?("__L")
        data[row_num][row_index - 1][1] = col_value
      elsif col_name =~ /^__nil/
        data[row_num][row_index] = [nil, nil]
        row_index += 1
      elsif col_name =~ /^T(\d+)___ts_(.+)$/
        table_idx = $1.to_i
        ts_source = $2
        row_timestamps["T#{table_idx}.#{ts_source}"] = col_value
      elsif col_name.end_with?('.PACKET_TIMESECONDS', '.RECEIVED_TIMESECONDS') || col_name == 'PACKET_TIMESECONDS' || col_name == 'RECEIVED_TIMESECONDS'
        ts_utc = coerce_to_utc(col_value)
        seconds_value = format_timestamp(ts_utc, :seconds)
        data[row_num][row_index] = [seconds_value, nil]
        row_index += 1
        if col_name.include?('.')
          row_timestamps[col_name] = col_value
        else
          row_timestamps["T0.#{col_name}"] = col_value
        end
      else
        type_info = item_types[col_name]
        unless type_info
          tables.length.times do |i|
            prefixed_name = "T#{i}.#{col_name}"
            type_info = item_types[prefixed_name]
            break if type_info
          end
          type_info ||= {}
        end
        decoded_value = decode_value(
          col_value,
          data_type: type_info['data_type'],
          array_size: type_info['array_size']
        )
        data[row_num][row_index] = [decoded_value, nil]
        row_index += 1
      end
    end

    calculated_items.keys.sort.each do |position|
      calc_info = calculated_items[position]
      ts_key = "T#{calc_info[:table_index]}.#{calc_info[:source]}"
      ts_value = row_timestamps[ts_key]
      ts_utc = coerce_to_utc(ts_value)
      calculated_value = format_timestamp(ts_utc, calc_info[:format])
      data[row_num].insert(position, [calculated_value, nil])
    end
  end
  if result.ntuples == 1
    data = data[0]
  end
  data
end

.value_type_for_column_suffix(column_name) ⇒ String

Determine the value type from a QuestDB column name’s suffix.

Parameters:

  • column_name (String)

    Column name possibly ending in __C, __F, __L

Returns:

  • (String)

    One of ‘FORMATTED’, ‘CONVERTED’, ‘RAW’



395
396
397
398
399
400
401
402
403
# File 'lib/openc3/utilities/questdb_client.rb', line 395

def self.value_type_for_column_suffix(column_name)
  if column_name.end_with?('__F')
    'FORMATTED'
  elsif column_name.end_with?('__C')
    'CONVERTED'
  else
    'RAW'
  end
end