Class: ActiveRecord::ConnectionAdapters::HiveAdapter
- Inherits:
-
AbstractAdapter
- Object
- AbstractAdapter
- ActiveRecord::ConnectionAdapters::HiveAdapter
show all
- Defined in:
- lib/active_record/connection_adapters/hive_adapter.rb
Defined Under Namespace
Modules: TableDefinitionExtensions
Classes: BindSubstitution
Constant Summary
collapse
- NATIVE_DATABASE_TYPES =
{
:string => { :name => "string" },
:text => { :name => "string" },
:integer => { :name => "int" },
:float => { :name => "float" },
:double => { :name => "double" },
:datetime => { :name => "string" },
:timestamp => { :name => "string" },
:time => { :name => "string" },
:date => { :name => "string" },
:binary => { :name => "string" },
:boolean => { :name => "tinyint" }
}
Instance Method Summary
collapse
-
#active? ⇒ Boolean
-
#adapter_name ⇒ Object
-
#add_column(table_name, column_name, type, options = { }) ⇒ Object
-
#add_column_options!(sql, options) ⇒ Object
-
#add_index(table_name, column_name, options = { }) ⇒ Object
-
#columns(table, name = nil) ⇒ Object
-
#connect ⇒ Object
-
#create_table(table_name, options = { }) {|table_definition| ... } ⇒ Object
-
#database_name ⇒ Object
-
#disconnect ⇒ Object
-
#execute(sql, name = nil) ⇒ Object
-
#initialize(connection, logger, connection_params, config) ⇒ HiveAdapter
constructor
A new instance of HiveAdapter.
-
#native_database_types ⇒ Object
-
#partitioned_by(partitions) ⇒ Object
-
#primary_key(table_name) ⇒ Object
-
#query(sql, name = nil) ⇒ Object
-
#reconnect! ⇒ Object
-
#select(sql, name = nil, binds = []) ⇒ Object
-
#select_rows(sql, name = nil) ⇒ Object
-
#supports_migrations? ⇒ Boolean
-
#supports_primary_key? ⇒ Boolean
-
#tables(name = nil) ⇒ Object
-
#with_auto_reconnect ⇒ Object
Constructor Details
#initialize(connection, logger, connection_params, config) ⇒ HiveAdapter
Returns a new instance of HiveAdapter.
84
85
86
87
88
89
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 84
def initialize(connection, logger, connection_params, config)
super(connection, logger)
@connection_params = connection_params
connect
@visitor = BindSubstitution.new(self)
end
|
Instance Method Details
#active? ⇒ Boolean
104
105
106
107
108
109
110
111
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 104
def active?
begin
@connection.execute("SET check=1")
true
rescue
false
end
end
|
#adapter_name ⇒ Object
119
120
121
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 119
def adapter_name 'Hive'
end
|
#add_column(table_name, column_name, type, options = { }) ⇒ Object
222
223
224
225
226
227
228
229
230
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 222
def add_column(table_name, column_name, type, options = { })
sql = "ALTER TABLE #{quote_table_name(table_name)} ADD COLUMNS (#{quote_column_name(column_name)} #{type_to_sql(type)}"
o = { }
o[:default] = options[:default] if options[:default]
o[:column] = HiveColumn.new(column_name, nil, type, false)
add_column_options!(sql, o)
sql << ")"
execute(sql)
end
|
#add_column_options!(sql, options) ⇒ Object
211
212
213
214
215
216
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 211
def add_column_options!(sql, options) meta = ""
meta << "ar_type=#{options[:column].type}"
meta << ",ar_default=#{options[:default]}" if options[:default]
sql << " COMMENT '#{meta}'"
end
|
#add_index(table_name, column_name, options = { }) ⇒ Object
218
219
220
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 218
def add_index(table_name, column_name, options = { })
raise NotImplementedError
end
|
#columns(table, name = nil) ⇒ Object
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 171
def columns(table, name=nil)
res = query("DESCRIBE FORMATTED #{quote_table_name(table)}", name)
table_info_index = res.find_index { |ln| ln.start_with?("# Detailed Table Information") }
begin_partition = false
columns = []
res.slice(0, table_info_index - 1).each do |ln|
if ln.start_with?("# Partition Information")
begin_partition = true
next
end
next if ln.strip.empty?
next if ln.start_with?("# col_name")
col_name, sql_type, = ln.split(/\s+/)
meta = Hash[.to_s.split(',').map { |meta| property, value = meta.split('=') }]
type = meta['ar_type'] || sql_type
columns << HiveColumn.new(col_name, meta['ar_default'], type, begin_partition)
end
columns
end
|
#create_table(table_name, options = { }) {|table_definition| ... } ⇒ Object
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 192
def create_table(table_name, options={ })
table_definition = TableDefinition.new(self)
table_definition.extend(TableDefinitionExtensions)
yield table_definition if block_given?
if options[:force] && table_exists?(table_name)
drop_table(table_name, options)
end
create_sql = "CREATE#{' EXTERNAL' if table_definition.external} TABLE "
create_sql << "#{quote_table_name(table_name)} ("
create_sql << table_definition.to_sql
create_sql << ") "
create_sql << "#{partitioned_by(table_definition.partitions)} "
create_sql << table_definition.row_format
execute create_sql
end
|
#database_name ⇒ Object
159
160
161
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 159
def database_name
@connection.database
end
|
#disconnect ⇒ Object
95
96
97
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 95
def disconnect
@connection.close
end
|
#execute(sql, name = nil) ⇒ Object
113
114
115
116
117
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 113
def execute(sql, name=nil)
with_auto_reconnect do
log(sql, name) { @connection.execute(sql) }
end
end
|
#native_database_types ⇒ Object
131
132
133
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 131
def native_database_types
NATIVE_DATABASE_TYPES
end
|
#partitioned_by(partitions) ⇒ Object
232
233
234
235
236
237
238
239
240
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 232
def partitioned_by(partitions)
unless partitions.to_a.empty?
spec = "PARTITIONED BY ("
spec << partitions.map do |p|
options = { :default => p.default, :column => p }
add_column_options!("#{p.name} #{p.sql_type}", options)
end.join(", ") << ")"
end
end
|
#primary_key(table_name) ⇒ Object
167
168
169
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 167
def primary_key(table_name)
nil
end
|
#query(sql, name = nil) ⇒ Object
135
136
137
138
139
140
141
142
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 135
def query(sql, name=nil)
with_auto_reconnect do
log(sql, name) do
@connection.execute(sql)
@connection.fetch_all
end
end
end
|
#reconnect! ⇒ Object
99
100
101
102
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 99
def reconnect!
disconnect
connect
end
|
#select(sql, name = nil, binds = []) ⇒ Object
144
145
146
147
148
149
150
151
152
153
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 144
def select(sql, name=nil, binds=[])
with_auto_reconnect do
log(sql, name) do
@connection.execute(sql)
fields = @connection.get_schema.fieldSchemas.map { |f| f.name }
res = @connection.fetch_all
res.map { |row| Hash[*fields.zip(row.split("\t")).flatten] }
end
end
end
|
#select_rows(sql, name = nil) ⇒ Object
155
156
157
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 155
def select_rows(sql, name=nil)
query(sql, name)
end
|
#supports_migrations? ⇒ Boolean
123
124
125
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 123
def supports_migrations? true
end
|
#supports_primary_key? ⇒ Boolean
127
128
129
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 127
def supports_primary_key? false
end
|
#tables(name = nil) ⇒ Object
163
164
165
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 163
def tables(name=nil)
query("SHOW TABLES", name)
end
|
#with_auto_reconnect ⇒ Object
242
243
244
245
246
247
248
|
# File 'lib/active_record/connection_adapters/hive_adapter.rb', line 242
def with_auto_reconnect
yield
rescue Thrift::TransportException => e
raise unless e.message == "end of file reached"
reconnect!
yield
end
|