Class: Blazer::Adapters::DruidAdapter
Constant Summary
collapse
- TIMESTAMP_REGEX =
/\A\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z\z/
Instance Attribute Summary
Attributes inherited from BaseAdapter
#data_source
Instance Method Summary
collapse
Methods inherited from BaseAdapter
#cachable?, #cancel, #cohort_analysis_statement, #cost, #explain, #initialize, #reconnect, #supports_cohort_analysis?
Instance Method Details
#parameter_binding ⇒ Object
97
98
99
|
# File 'lib/blazer/adapters/druid_adapter.rb', line 97
def parameter_binding
:positional
end
|
#preview_statement ⇒ Object
86
87
88
|
# File 'lib/blazer/adapters/druid_adapter.rb', line 86
def preview_statement
"SELECT * FROM {table} LIMIT 10"
end
|
#quoting ⇒ Object
92
93
94
|
# File 'lib/blazer/adapters/druid_adapter.rb', line 92
def quoting
:single_quote_escape
end
|
#run_statement(statement, comment, bind_params) ⇒ Object
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/blazer/adapters/druid_adapter.rb', line 6
def run_statement(statement, , bind_params)
require "json"
require "net/http"
require "uri"
columns = []
rows = []
error = nil
params =
bind_params.map do |v|
type =
case v
when Integer
"BIGINT"
when Float
"DOUBLE"
when ActiveSupport::TimeWithZone
v = (v.to_f * 1000).round
"TIMESTAMP"
else
"VARCHAR"
end
{type: type, value: v}
end
= {"Content-Type" => "application/json", "Accept" => "application/json"}
timeout = data_source.timeout ? data_source.timeout.to_i : 300
data = {
query: statement,
parameters: params,
context: {
timeout: timeout * 1000
}
}
uri = URI.parse("#{settings["url"]}/druid/v2/sql/")
http = Net::HTTP.new(uri.host, uri.port)
http.read_timeout = timeout
begin
response = JSON.parse(http.post(uri.request_uri, data.to_json, ).body)
if response.is_a?(Hash)
error = response["errorMessage"] || "Unknown error: #{response.inspect}"
if error.include?("timed out")
error = Blazer::TIMEOUT_MESSAGE
elsif error.include?("Encountered \"?\" at")
error = Blazer::VARIABLE_MESSAGE
end
else
columns = (response.first || {}).keys
rows = response.map { |r| r.values }
rows.each do |row|
row.each_with_index do |v, i|
if v.is_a?(String) && TIMESTAMP_REGEX.match(v)
row[i] = Time.parse(v)
end
end
end
end
rescue => e
error = e.message
end
[columns, rows, error]
end
|
#schema ⇒ Object
81
82
83
84
|
# File 'lib/blazer/adapters/druid_adapter.rb', line 81
def schema
result = data_source.run_statement("SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA NOT IN ('INFORMATION_SCHEMA') ORDER BY 1, 2")
result.rows.group_by { |r| [r[0], r[1]] }.map { |k, vs| {schema: k[0], table: k[1], columns: vs.sort_by { |v| v[2] }.map { |v| {name: v[2], data_type: v[3]} }} }
end
|
#tables ⇒ Object
76
77
78
79
|
# File 'lib/blazer/adapters/druid_adapter.rb', line 76
def tables
result = data_source.run_statement("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA NOT IN ('INFORMATION_SCHEMA') ORDER BY TABLE_NAME")
result.rows.map(&:first)
end
|