Class: ActiveRecord::ConnectionAdapters::PinotAdapter

Inherits:
AbstractAdapter
  • Object
show all
Defined in:
lib/active_record/connection_adapters/pinot_adapter.rb,
lib/active_record/connection_adapters/pinot_adapter/table_structure.rb

Defined Under Namespace

Modules: TableStructure

Constant Summary collapse

TYPES =
{
  "INT" => Type::Integer.new,
  "TIMESTAMP" => Type::DateTime.new,
  "FLOAT" => Type::Decimal.new,
  "LONG" => Type::Decimal.new,
  "STRING" => Type::String.new,
  "JSON" => ActiveRecord::Type::Json.new
}
INTEGER_REGEX =
/integer/i

Instance Method Summary collapse

Constructor Details

#initialize(config = {}) ⇒ PinotAdapter

Returns a new instance of PinotAdapter.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/active_record/connection_adapters/pinot_adapter.rb', line 24

def initialize(config = {})
  @pinot_host = config.fetch(:host)
  @pinot_port = config.fetch(:port)
  @pinot_controller_port = config.fetch(:controller_port)
  @pinot_controller_host = config.fetch(:controller_host) || @pinot_host
  @pinot_socks5_uri = config.fetch(:socks5_uri, nil)
  @pinot_bearer_token = config.fetch(:bearer_token, nil)
  @pinot_protocol = config.fetch(:protocol, "http")
  @pinot_query_options = config.fetch(:query_options, {})
  # TODO: does it need connection pooling?

  @pinot_client = ::Pinot::Client.new(
    host: @pinot_host,
    port: @pinot_port,
    controller_host: @pinot_controller_host,
    controller_port: @pinot_controller_port,
    protocol: @pinot_protocol,
    socks5_uri: @pinot_socks5_uri,
    bearer_token: @pinot_bearer_token,
    query_options: @pinot_query_options
  )

  super(config)
end

Instance Method Details

#data_sourcesObject



63
64
65
# File 'lib/active_record/connection_adapters/pinot_adapter.rb', line 63

def data_sources
  @pinot_data_sources ||= @pinot_client.tables["tables"]
end

#default_prepared_statementsObject



49
50
51
# File 'lib/active_record/connection_adapters/pinot_adapter.rb', line 49

def default_prepared_statements
  false
end

#exec_query(sql, name = "SQL", binds = [], prepare: false) ⇒ Object



143
144
145
# File 'lib/active_record/connection_adapters/pinot_adapter.rb', line 143

def exec_query(sql, name = "SQL", binds = [], prepare: false)
  internal_exec_query(sql, name, binds, prepare: prepare, async: false)
end

#extract_default_function(default_value, default) ⇒ Object



108
109
110
# File 'lib/active_record/connection_adapters/pinot_adapter.rb', line 108

def extract_default_function(default_value, default)
  default if has_default_function?(default_value, default)
end

#extract_value_from_default(default) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/active_record/connection_adapters/pinot_adapter.rb', line 85

def extract_value_from_default(default)
  case default
  when /^null$/i
    nil
  # Quoted types
  when /^'([^|]*)'$/m
    $1.gsub("''", "'")
  # Quoted types
  when /^"([^|]*)"$/m
    $1.gsub('""', '"')
  # Numeric types
  when /\A-?\d+(\.\d*)?\z/
    $&
  # Binary columns
  when /x'(.*)'/
    [$1].pack("H*")
  else
    # Anything else is blank or some function
    # and we can't know the value of that, so return nil.
    nil
  end
end

#has_default_function?(default_value, default) ⇒ Boolean

Returns:

  • (Boolean)


112
113
114
# File 'lib/active_record/connection_adapters/pinot_adapter.rb', line 112

def has_default_function?(default_value, default)
  !default_value && %r{\w+\(.*\)|CURRENT_TIME|CURRENT_DATE|CURRENT_TIMESTAMP|\|\|}.match?(default)
end

#internal_exec_query(sql, name = "SQL", binds = [], prepare: false, async: false) ⇒ Object

:nodoc:



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/active_record/connection_adapters/pinot_adapter.rb', line 126

def internal_exec_query(sql, name = "SQL", binds = [], prepare: false, async: false) # :nodoc:
  # rows = [
  #   [Time.now, 1, 2.0],
  #   [Time.now, 2, 2.0],
  #   [Time.now, 2, 2.0]
  # ]
  response = @pinot_client.execute(sql)
  rows = response.rows
  columns = response.columns
  columns.transform_values! { |value| TYPES.fetch(value, value) }
  ActiveRecord::Result.new(
    response.columns.keys,
    rows.to_a,
    columns
  )
end

#is_column_the_rowid?(field, column_definitions) ⇒ Boolean

Returns:

  • (Boolean)


117
118
119
120
121
122
123
124
# File 'lib/active_record/connection_adapters/pinot_adapter.rb', line 117

def is_column_the_rowid?(field, column_definitions)
  return false unless INTEGER_REGEX.match?(field["type"]) && field["pk"] == 1
  # is the primary key a single column?
  column_definitions.one? { |c|
    col_pk = c["pk"] || 0
    col_pk > 0
  }
end

#new_column_from_field(table_name, field, definitions = nil) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/active_record/connection_adapters/pinot_adapter.rb', line 67

def new_column_from_field(table_name, field, definitions = nil)
  default = nil

   = (field["type"])
  default_value = extract_value_from_default(default)
  default_function = extract_default_function(default_value, default)

  Column.new(
    field["name"],
    default_value,
    ,
    field["notnull"].to_i == 0,
    default_function,
    collation: field["collation"]
  )
end

#prepared_statementsObject



53
54
55
# File 'lib/active_record/connection_adapters/pinot_adapter.rb', line 53

def prepared_statements
  false
end

#primary_keys(table_name) ⇒ Object



147
148
149
150
# File 'lib/active_record/connection_adapters/pinot_adapter.rb', line 147

def primary_keys(table_name)
  # TODO: implement
  [:id]
end

#table_structure(table_name) ⇒ Object Also known as: column_definitions



57
58
59
60
61
# File 'lib/active_record/connection_adapters/pinot_adapter.rb', line 57

def table_structure(table_name)
  schema = @pinot_client.schema(table_name)
  @table_structure = TableStructure.from_schema(schema)
  @table_structure.sort_by! { |x| x[:name] }
end