Class: Impala::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/impala/connection.rb

Overview

This object represents a connection to an Impala server. It can be used to perform queries on the database.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, options = {}) ⇒ Connection

Don't instantiate Connections directly; instead, use Impala.connect.



8
9
10
11
12
13
14
15
# File 'lib/impala/connection.rb', line 8

def initialize(host, port, options={})
  @host = host
  @port = port
  @connected = false
  @options = options.dup
  @options[:transport] ||= :buffered
  open
end

Instance Attribute Details

#hostObject

Returns the value of attribute host.



5
6
7
# File 'lib/impala/connection.rb', line 5

def host
  @host
end

#portObject

Returns the value of attribute port.



5
6
7
# File 'lib/impala/connection.rb', line 5

def port
  @port
end

Instance Method Details

#closeObject

Close this connection. It can still be reopened with #open.



67
68
69
70
71
72
# File 'lib/impala/connection.rb', line 67

def close
  return unless @connected

  @transport.close
  @connected = false
end

#close_handle(handle) ⇒ Object



114
115
116
# File 'lib/impala/connection.rb', line 114

def close_handle(handle)
  @service.close(handle)
end

#execute(raw_query, query_options = {}) ⇒ Cursor

Perform a query and return a cursor for iterating over the results.

Parameters:

  • query (String)

    the query you want to run

  • query_options (Hash) (defaults to: {})

    the options to set user and configuration except for :user, see TImpalaQueryOptions in ImpalaService.thrift

Options Hash (query_options):

  • :user (String)

    the user runs the query

Returns:

  • (Cursor)

    a cursor for the result rows

Raises:



103
104
105
106
107
108
109
110
111
112
# File 'lib/impala/connection.rb', line 103

def execute(raw_query, query_options = {})
  raise ConnectionError.new("Connection closed") unless open?

  query = sanitize_query(raw_query)
  handle = send_query(query, query_options)

  cursor = Cursor.new(handle, @service, @options)
  cursor.wait!
  cursor
end

#inspectObject



17
18
19
# File 'lib/impala/connection.rb', line 17

def inspect
  "#<#{self.class} #{@host}:#{@port}#{open? ? '' : ' (DISCONNECTED)'}>"
end

#openObject

Open the connection if it's currently closed.



22
23
24
25
26
27
28
29
30
31
# File 'lib/impala/connection.rb', line 22

def open
  return if @connected

  @transport = thrift_transport(host, port)
  @transport.open

  proto = Thrift::BinaryProtocol.new(@transport)
  @service = Protocol::ImpalaService::Client.new(proto)
  @connected = true
end

#open?Boolean

Returns true if the connection is currently open.

Returns:

  • (Boolean)


75
76
77
# File 'lib/impala/connection.rb', line 75

def open?
  @connected
end

#parse_sasl_params(sasl_params) ⇒ Object

Processes SASL connection params and returns a hash with symbol keys or a nil



55
56
57
58
59
60
61
62
63
64
# File 'lib/impala/connection.rb', line 55

def parse_sasl_params(sasl_params)
  # Symbilize keys in a hash
  if sasl_params.kind_of?(Hash)
    return sasl_params.inject({}) do |memo,(k,v)|
      memo[k.to_sym] = v;
      memo
    end
  end
  return nil
end

#query(raw_query, query_options = {}) ⇒ Array<Hash>

Perform a query and return all the results. This will load the entire result set into memory, so if you're dealing with lots of rows, #execute may work better.

Parameters:

  • query (String)

    the query you want to run

  • query_options (Hash) (defaults to: {})

    the options to set user and configuration except for :user, see TImpalaQueryOptions in ImpalaService.thrift

Options Hash (query_options):

  • :user (String)

    the user runs the query

Returns:

  • (Array<Hash>)

    an array of hashes, one for each row.



93
94
95
# File 'lib/impala/connection.rb', line 93

def query(raw_query, query_options = {})
  execute(raw_query, query_options).fetch_all
end

#refreshObject

Refresh the metadata store.

Raises:



80
81
82
83
# File 'lib/impala/connection.rb', line 80

def refresh
  raise ConnectionError.new("Connection closed") unless open?
  @service.ResetCatalog
end

#thrift_socket(server, port, timeout) ⇒ Object



48
49
50
51
52
# File 'lib/impala/connection.rb', line 48

def thrift_socket(server, port, timeout)
  socket = Thrift::Socket.new(server, port)
  socket.timeout = timeout
  socket
end

#thrift_transport(server, port) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/impala/connection.rb', line 33

def thrift_transport(server, port)
  socket = thrift_socket(server, port, @options[:timeout])

  case @options[:transport]
  when :buffered
    return Thrift::BufferedTransport.new(socket)
  when :sasl
    opts = parse_sasl_params(@options[:sasl_params])
    mechanism = opts.delete(:mechanism)
    return SASLTransport.new(socket, mechanism, opts)
  else
    raise "Unrecognised transport type '#{@options[:transport]}'"
  end
end