Class: RBHive::TCLIConnection
- Inherits:
-
Object
- Object
- RBHive::TCLIConnection
show all
- Defined in:
- lib/rbhive/t_c_l_i_connection.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#add_columns(schema) ⇒ Object
-
#async_cancel(handles) ⇒ Object
-
#async_close_session(handles) ⇒ Object
-
#async_execute(query) ⇒ Object
-
#async_fetch(handles, max_rows = 100) ⇒ Object
Async fetch results from an async execute.
-
#async_fetch_in_batch(handles, batch_size = 1000, &block) ⇒ Object
Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.
-
#async_is_cancelled?(handles) ⇒ Boolean
-
#async_is_complete?(handles) ⇒ Boolean
-
#async_is_failed?(handles) ⇒ Boolean
-
#async_is_running?(handles) ⇒ Boolean
Is the query actually running?.
-
#async_state(handles) ⇒ Object
-
#close ⇒ Object
-
#close_session ⇒ Object
-
#create_table(schema) ⇒ Object
-
#drop_table(name) ⇒ Object
-
#execute(query) ⇒ Object
-
#explain(query) ⇒ Object
Performs a explain on the supplied query on the server, returns it as a ExplainResult.
-
#fetch(query, max_rows = 100) ⇒ Object
Performs a query on the server, fetches up to max_rows rows and returns them as an array.
-
#fetch_in_batch(query, batch_size = 1000, &block) ⇒ Object
Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.
-
#fetch_rows(op_handle, orientation = :first, max_rows = 1000) ⇒ Object
Pull rows from the query result.
-
#initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new) ⇒ TCLIConnection
constructor
A new instance of TCLIConnection.
-
#method_missing(meth, *args) ⇒ Object
-
#open ⇒ Object
-
#open_session ⇒ Object
-
#parse_sasl_params(sasl_params) ⇒ Object
Processes SASL connection params and returns a hash with symbol keys or a nil.
-
#priority=(priority) ⇒ Object
-
#queue=(queue) ⇒ Object
-
#replace_columns(schema) ⇒ Object
-
#session ⇒ Object
-
#set(name, value) ⇒ Object
-
#thrift_hive_protocol(version) ⇒ Object
-
#thrift_socket(server, port, timeout) ⇒ Object
-
#thrift_transport(server, port) ⇒ Object
Constructor Details
#initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new) ⇒ TCLIConnection
Returns a new instance of TCLIConnection.
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 84
def initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new)
options ||= {} raise "'options' parameter must be a hash" unless options.is_a?(Hash)
if options[:transport] == :sasl and options[:sasl_params].nil?
raise ":transport is set to :sasl, but no :sasl_params option was supplied"
end
options[:transport] ||= :buffered
options[:hive_version] ||= 10
options[:timeout] ||= 1800
@options = options
@thrift_protocol_version = thrift_hive_protocol(options[:hive_version])
@logger = logger
@transport = thrift_transport(server, port)
@protocol = Thrift::BinaryProtocol.new(@transport)
@client = Hive2::Thrift::TCLIService::Client.new(@protocol)
@session = nil
@logger.info("Connecting to HiveServer2 #{server} on port #{port}")
end
|
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(meth, *args) ⇒ Object
371
372
373
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 371
def method_missing(meth, *args)
client.send(meth, *args)
end
|
Instance Attribute Details
#client ⇒ Object
Returns the value of attribute client.
82
83
84
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 82
def client
@client
end
|
Instance Method Details
#add_columns(schema) ⇒ Object
367
368
369
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 367
def add_columns(schema)
execute(schema.add_columns_statement)
end
|
#async_cancel(handles) ⇒ Object
232
233
234
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 232
def async_cancel(handles)
@client.CancelOperation(prepare_cancel_request(handles))
end
|
#async_close_session(handles) ⇒ Object
294
295
296
297
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 294
def async_close_session(handles)
validate_handles!(handles)
@client.CloseSession(Hive2::Thrift::TCloseSessionReq.new( sessionHandle: handles[:session] ))
end
|
#async_execute(query) ⇒ Object
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 193
def async_execute(query)
@logger.info("Executing query asynchronously: #{query}")
exec_result = @client.ExecuteStatement(
Hive2::Thrift::TExecuteStatementReq.new(
sessionHandle: @session.sessionHandle,
statement: query,
runAsync: true
)
)
raise_error_if_failed!(exec_result)
op_handle = exec_result.operationHandle
{
session: @session.sessionHandle,
guid: op_handle.operationId.guid,
secret: op_handle.operationId.secret
}
end
|
#async_fetch(handles, max_rows = 100) ⇒ Object
Async fetch results from an async execute
267
268
269
270
271
272
273
274
275
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 267
def async_fetch(handles, max_rows = 100)
unless async_is_complete?(handles)
raise "Can't perform fetch on a query in state: #{async_state(handles)}"
end
fetch_rows(prepare_operation_handle(handles), :first, max_rows)
end
|
#async_fetch_in_batch(handles, batch_size = 1000, &block) ⇒ Object
Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.
279
280
281
282
283
284
285
286
287
288
289
290
291
292
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 279
def async_fetch_in_batch(handles, batch_size = 1000, &block)
raise "No block given for the batch fetch request!" unless block_given?
unless async_is_complete?(handles)
raise "Can't perform fetch on a query in state: #{async_state(handles)}"
end
loop do
rows = fetch_rows(prepare_operation_handle(handles), :next, batch_size)
break if rows.empty?
yield rows
end
end
|
#async_is_cancelled?(handles) ⇒ Boolean
228
229
230
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 228
def async_is_cancelled?(handles)
async_state(handles) == :cancelled
end
|
#async_is_complete?(handles) ⇒ Boolean
214
215
216
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 214
def async_is_complete?(handles)
async_state(handles) == :finished
end
|
#async_is_failed?(handles) ⇒ Boolean
224
225
226
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 224
def async_is_failed?(handles)
async_state(handles) == :error
end
|
#async_is_running?(handles) ⇒ Boolean
Is the query actually running?
219
220
221
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 219
def async_is_running?(handles)
async_state(handles) == :running
end
|
#async_state(handles) ⇒ Object
#close ⇒ Object
150
151
152
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 150
def close
@transport.close
end
|
#close_session ⇒ Object
158
159
160
161
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 158
def close_session
@client.CloseSession prepare_close_session
@session = nil
end
|
#create_table(schema) ⇒ Object
354
355
356
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 354
def create_table(schema)
execute(schema.create_table_statement)
end
|
#drop_table(name) ⇒ Object
358
359
360
361
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 358
def drop_table(name)
name = name.name if name.is_a?(TableSchema)
execute("DROP TABLE `#{name}`")
end
|
#execute(query) ⇒ Object
171
172
173
174
175
176
177
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 171
def execute(query)
@logger.info("Executing Hive Query: #{query}")
req = prepare_execute_statement(query)
exec_result = client.ExecuteStatement(req)
raise_error_if_failed!(exec_result)
exec_result
end
|
#explain(query) ⇒ Object
310
311
312
313
314
315
316
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 310
def explain(query)
rows = []
fetch_in_batch("EXPLAIN " + query) do |batch|
rows << batch.map { |b| b[:Explain] }
end
ExplainResult.new(rows.flatten)
end
|
#fetch(query, max_rows = 100) ⇒ Object
Performs a query on the server, fetches up to max_rows rows and returns them as an array.
319
320
321
322
323
324
325
326
327
328
329
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 319
def fetch(query, max_rows = 100)
exec_result = execute(query)
raise_error_if_failed!(exec_result)
op_handle = exec_result.operationHandle
fetch_rows(op_handle, :first, max_rows)
end
|
#fetch_in_batch(query, batch_size = 1000, &block) ⇒ Object
Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 333
def fetch_in_batch(query, batch_size = 1000, &block)
raise "No block given for the batch fetch request!" unless block_given?
exec_result = execute(query)
raise_error_if_failed!(exec_result)
op_handle = exec_result.operationHandle
fetch_req = prepare_fetch_results(op_handle, :next, batch_size)
loop do
rows = fetch_rows(op_handle, :next, batch_size)
break if rows.empty?
yield rows
end
end
|
#fetch_rows(op_handle, orientation = :first, max_rows = 1000) ⇒ Object
Pull rows from the query result
300
301
302
303
304
305
306
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 300
def fetch_rows(op_handle, orientation = :first, max_rows = 1000)
fetch_req = prepare_fetch_results(op_handle, orientation, max_rows)
fetch_results = @client.FetchResults(fetch_req)
raise_error_if_failed!(fetch_results)
rows = fetch_results.results.rows
TCLIResultSet.new(rows, TCLISchemaDefinition.new(get_schema_for(op_handle), rows.first))
end
|
#open ⇒ Object
146
147
148
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 146
def open
@transport.open
end
|
#open_session ⇒ Object
154
155
156
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 154
def open_session
@session = @client.OpenSession(prepare_open_session(@thrift_protocol_version))
end
|
#parse_sasl_params(sasl_params) ⇒ Object
Processes SASL connection params and returns a hash with symbol keys or a nil
135
136
137
138
139
140
141
142
143
144
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 135
def parse_sasl_params(sasl_params)
if sasl_params.kind_of?(Hash)
return sasl_params.inject({}) do |memo,(k,v)|
memo[k.to_sym] = v;
memo
end
end
return nil
end
|
#priority=(priority) ⇒ Object
179
180
181
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 179
def priority=(priority)
set("mapred.job.priority", priority)
end
|
#queue=(queue) ⇒ Object
183
184
185
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 183
def queue=(queue)
set("mapred.job.queue.name", queue)
end
|
#replace_columns(schema) ⇒ Object
363
364
365
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 363
def replace_columns(schema)
execute(schema.replace_columns_statement)
end
|
#session ⇒ Object
163
164
165
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 163
def session
@session && @session.sessionHandle
end
|
#set(name, value) ⇒ Object
187
188
189
190
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 187
def set(name,value)
@logger.info("Setting #{name}=#{value}")
self.execute("SET #{name}=#{value}")
end
|
#thrift_hive_protocol(version) ⇒ Object
109
110
111
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 109
def thrift_hive_protocol(version)
HIVE_THRIFT_MAPPING[version] || raise("Invalid Hive version")
end
|
#thrift_socket(server, port, timeout) ⇒ Object
128
129
130
131
132
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 128
def thrift_socket(server, port, timeout)
socket = Thrift::Socket.new(server, port)
socket.timeout = timeout
socket
end
|
#thrift_transport(server, port) ⇒ Object
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 113
def thrift_transport(server, port)
@logger.info("Initializing transport #{@options[:transport]}")
case @options[:transport]
when :buffered
return Thrift::BufferedTransport.new(thrift_socket(server, port, @options[:timeout]))
when :sasl
return Thrift::SaslClientTransport.new(thrift_socket(server, port, @options[:timeout]),
parse_sasl_params(@options[:sasl_params]))
when :http
return Thrift::HTTPClientTransport.new("http://#{server}:#{port}/cliservice")
else
raise "Unrecognised transport type '#{transport}'"
end
end
|