Class: RBHive::TCLIConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/rbhive/t_c_l_i_connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new) ⇒ TCLIConnection

Returns a new instance of TCLIConnection.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/rbhive/t_c_l_i_connection.rb', line 73

def initialize(server, port=10_000, options={}, logger=StdOutLogger.new)
  options ||= {} # backwards compatibility
  raise "'options' parameter must be a hash" unless options.is_a?(Hash)
  
  if options[:transport] == :sasl and options[:sasl_params].nil?
    raise ":transport is set to :sasl, but no :sasl_params option was supplied"
  end
  
  # Defaults to buffered transport, Hive 0.10, 1800 second timeout
  options[:transport]     ||= :buffered
  options[:hive_version]  ||= 10
  options[:timeout]       ||= 1800
  @options = options
  
  # Look up the appropriate Thrift protocol version for the supplied Hive version
  @thrift_protocol_version = thrift_hive_protocol(options[:hive_version])
  
  @logger = logger
  @transport = thrift_transport(server, port)
  @protocol = Thrift::BinaryProtocol.new(@transport)
  @client = Hive2::Thrift::TCLIService::Client.new(@protocol)
  @session = nil
  @logger.info("Connecting to HiveServer2 #{server} on port #{port}")
  @mutex = Mutex.new
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(meth, *args) ⇒ Object



262
263
264
# File 'lib/rbhive/t_c_l_i_connection.rb', line 262

def method_missing(meth, *args)
  client.send(meth, *args)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



71
72
73
# File 'lib/rbhive/t_c_l_i_connection.rb', line 71

def client
  @client
end

Instance Method Details

#add_columns(schema) ⇒ Object



258
259
260
# File 'lib/rbhive/t_c_l_i_connection.rb', line 258

def add_columns(schema)
  execute(schema.add_columns_statement)
end

#closeObject



140
141
142
# File 'lib/rbhive/t_c_l_i_connection.rb', line 140

def close
  @transport.close
end

#close_sessionObject



148
149
150
151
# File 'lib/rbhive/t_c_l_i_connection.rb', line 148

def close_session
  @client.CloseSession prepare_close_session
  @session = nil
end

#create_table(schema) ⇒ Object



245
246
247
# File 'lib/rbhive/t_c_l_i_connection.rb', line 245

def create_table(schema)
  execute(schema.create_table_statement)
end

#drop_table(name) ⇒ Object



249
250
251
252
# File 'lib/rbhive/t_c_l_i_connection.rb', line 249

def drop_table(name)
  name = name.name if name.is_a?(TableSchema)
  execute("DROP TABLE `#{name}`")
end

#execute(query) ⇒ Object



161
162
163
# File 'lib/rbhive/t_c_l_i_connection.rb', line 161

def execute(query)
  execute_safe(query)
end

#explain(query) ⇒ Object

Performs a explain on the supplied query on the server, returns it as a ExplainResult. (Only works on 0.12 if you have this patch - issues.apache.org/jira/browse/HIVE-5492)



180
181
182
183
184
185
186
# File 'lib/rbhive/t_c_l_i_connection.rb', line 180

def explain(query)
  rows = []
  fetch_in_batch("EXPLAIN " + query) do |batch|
    rows << batch.map { |b| b[:Explain] }
  end
  ExplainResult.new(rows.flatten)
end

#fetch(query, max_rows = 100) ⇒ Object

Performs a query on the server, fetches up to max_rows rows and returns them as an array.



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/rbhive/t_c_l_i_connection.rb', line 189

def fetch(query, max_rows = 100)
  safe do
    # Execute the query and check the result
    exec_result = execute_unsafe(query)
    raise_error_if_failed!(exec_result)

    # Get search operation handle to fetch the results
    op_handle = exec_result.operationHandle

    # Prepare and execute fetch results request
    fetch_req = prepare_fetch_results(op_handle, :first, max_rows)
    fetch_results = client.FetchResults(fetch_req)
    raise_error_if_failed!(fetch_results)

    # Get data rows and format the result
    rows = fetch_results.results.rows
    the_schema = TCLISchemaDefinition.new(get_schema_for( op_handle ), rows.first)
    TCLIResultSet.new(rows, the_schema)
  end
end

#fetch_in_batch(query, batch_size = 1000, &block) ⇒ Object

Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/rbhive/t_c_l_i_connection.rb', line 212

def fetch_in_batch(query, batch_size = 1000, &block)
  raise "No block given for the batch fetch request!" unless block_given?
  safe do
    # Execute the query and check the result
    exec_result = execute_unsafe(query)
    raise_error_if_failed!(exec_result)

    # Get search operation handle to fetch the results
    op_handle = exec_result.operationHandle

    # Prepare fetch results request
    fetch_req = prepare_fetch_results(op_handle, :next, batch_size)

    # Now let's iterate over the results
    loop do
      # Fetch next batch and raise an exception if it failed
      fetch_results = client.FetchResults(fetch_req)
      raise_error_if_failed!(fetch_results)

      # Get data rows from the result
      rows = fetch_results.results.rows
      break if rows.empty?

      # Prepare schema definition for the row
      schema_for_req ||= get_schema_for(op_handle)
      the_schema ||= TCLISchemaDefinition.new(schema_for_req, rows.first)

      # Format the results and yield them to the given block
      yield TCLIResultSet.new(rows, the_schema)
    end
  end
end

#openObject



136
137
138
# File 'lib/rbhive/t_c_l_i_connection.rb', line 136

def open
  @transport.open
end

#open_sessionObject



144
145
146
# File 'lib/rbhive/t_c_l_i_connection.rb', line 144

def open_session
  @session = @client.OpenSession(prepare_open_session(@thrift_protocol_version))
end

#parse_sasl_params(sasl_params) ⇒ Object

Processes SASL connection params and returns a hash with symbol keys or a nil



125
126
127
128
129
130
131
132
133
134
# File 'lib/rbhive/t_c_l_i_connection.rb', line 125

def parse_sasl_params(sasl_params)
  # Symbilize keys in a hash
  if sasl_params.kind_of?(Hash)
    return sasl_params.inject({}) do |memo,(k,v)|
      memo[k.to_sym] = v;
      memo
    end
  end
  return nil
end

#priority=(priority) ⇒ Object



165
166
167
# File 'lib/rbhive/t_c_l_i_connection.rb', line 165

def priority=(priority)
  set("mapred.job.priority", priority)
end

#queue=(queue) ⇒ Object



169
170
171
# File 'lib/rbhive/t_c_l_i_connection.rb', line 169

def queue=(queue)
  set("mapred.job.queue.name", queue)
end

#replace_columns(schema) ⇒ Object



254
255
256
# File 'lib/rbhive/t_c_l_i_connection.rb', line 254

def replace_columns(schema)
  execute(schema.replace_columns_statement)
end

#sessionObject



153
154
155
# File 'lib/rbhive/t_c_l_i_connection.rb', line 153

def session
  @session && @session.sessionHandle
end

#set(name, value) ⇒ Object



173
174
175
176
# File 'lib/rbhive/t_c_l_i_connection.rb', line 173

def set(name,value)
  @logger.info("Setting #{name}=#{value}")
  self.execute("SET #{name}=#{value}")
end

#thrift_hive_protocol(version) ⇒ Object



99
100
101
# File 'lib/rbhive/t_c_l_i_connection.rb', line 99

def thrift_hive_protocol(version)
  HIVE_THRIFT_MAPPING[version] || raise("Invalid Hive version")
end

#thrift_socket(server, port, timeout) ⇒ Object



118
119
120
121
122
# File 'lib/rbhive/t_c_l_i_connection.rb', line 118

def thrift_socket(server, port, timeout)
  socket = Thrift::Socket.new(server, port)
  socket.timeout = timeout
  socket
end

#thrift_transport(server, port) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/rbhive/t_c_l_i_connection.rb', line 103

def thrift_transport(server, port)
  @logger.info("Initializing transport #{@options[:transport]}")
  case @options[:transport]
  when :buffered
    return Thrift::BufferedTransport.new(thrift_socket(server, port, @options[:timeout]))
  when :sasl
    return Thrift::SaslClientTransport.new(thrift_socket(server, port, @options[:timeout]),
                                           parse_sasl_params(@options[:sasl_params]))
  when :http
    return Thrift::HTTPClientTransport.new("http://#{server}:#{port}/cliservice")
  else
    raise "Unrecognised transport type '#{transport}'"
  end
end