Class: Vertica::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/vertica/connection.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Connection

Opens a connectio the a Vertica server

Parameters:

  • options (Hash) (defaults to: {})

    The connection options to use.



15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/vertica/connection.rb', line 15

def initialize(options = {})
  reset_values

  @options = {}
  options.each { |key, value| @options[key.to_s.to_sym] = value }
  @options[:port] ||= 5433

  @row_style = @options[:row_style] ? @options[:row_style] : :hash
  unless options[:skip_startup]
    startup_connection
    initialize_connection
  end
end

Instance Attribute Details

#backend_keyObject (readonly)

Returns the value of attribute backend_key.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def backend_key
  @backend_key
end

#backend_pidObject (readonly)

Returns the value of attribute backend_pid.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def backend_pid
  @backend_pid
end

#debugObject

Returns the value of attribute debug.



7
8
9
# File 'lib/vertica/connection.rb', line 7

def debug
  @debug
end

#notice_handlerObject (readonly)

Returns the value of attribute notice_handler.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def notice_handler
  @notice_handler
end

#noticesObject (readonly)

Returns the value of attribute notices.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def notices
  @notices
end

#optionsObject (readonly)

Returns the value of attribute options.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def options
  @options
end

#parametersObject (readonly)

Returns the value of attribute parameters.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def parameters
  @parameters
end

#row_styleObject

Returns the value of attribute row_style.



7
8
9
# File 'lib/vertica/connection.rb', line 7

def row_style
  @row_style
end

#session_idObject (readonly)

Returns the value of attribute session_id.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def session_id
  @session_id
end

#transaction_statusObject (readonly)

Returns the value of attribute transaction_status.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def transaction_status
  @transaction_status
end

Class Method Details

.cancel(existing_conn) ⇒ Object



9
10
11
# File 'lib/vertica/connection.rb', line 9

def self.cancel(existing_conn)
  existing_conn.cancel
end

Instance Method Details

#busy?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/vertica/connection.rb', line 64

def busy?
  !ready_for_query?
end

#cancelObject



94
95
96
97
98
99
# File 'lib/vertica/connection.rb', line 94

def cancel
  conn = self.class.new(options.merge(:skip_startup => true))
  conn.write Vertica::Messages::CancelRequest.new(backend_pid, backend_key)
  conn.write Vertica::Messages::Flush.new
  conn.socket.close
end

#closeObject



78
79
80
81
82
83
84
85
# File 'lib/vertica/connection.rb', line 78

def close
  write Vertica::Messages::Terminate.new
  socket.close
  @socket = nil
rescue Errno::ENOTCONN # the backend closed the socket already
ensure
  reset_values
end

#closed?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/vertica/connection.rb', line 60

def closed?
  !opened?
end

#copy(sql, source = nil, &block) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/vertica/connection.rb', line 155

def copy(sql, source = nil, &block)
  with_lock do
    job = Vertica::Query.new(self, sql, :row_style => @row_style)
    if block_given?
      job.copy_handler = block
    elsif source && File.exists?(source.to_s)
      job.copy_handler = lambda { |data| file_copy_handler(source, data) }
    elsif source.respond_to?(:read) && source.respond_to?(:eof?)
      job.copy_handler = lambda { |data| io_copy_handler(source, data) }
    end
    job.run
  end
end

#inspectObject



169
170
171
172
# File 'lib/vertica/connection.rb', line 169

def inspect
  safe_options = @options.reject{ |name, _| name == :password }
  "#<Vertica::Connection:#{object_id} @parameters=#{@parameters.inspect} @backend_pid=#{@backend_pid}, @backend_key=#{@backend_key}, @transaction_status=#{@transaction_status}, @socket=#{@socket}, @options=#{safe_options.inspect}, @row_style=#{@row_style}>"
end

#interruptObject



101
102
103
104
105
106
107
# File 'lib/vertica/connection.rb', line 101

def interrupt
  raise Vertica::Error::ConnectionError, "Session cannopt be interrupted because the session ID is not known!" if session_id.nil?
  conn = self.class.new(options.merge(:interruptable => false, :role => nil, :search_path => nil))
  response = conn.query("SELECT CLOSE_SESSION(#{Vertica.quote(session_id)})").the_value
  conn.close
  return response
end

#interruptable?Boolean

Returns:

  • (Boolean)


109
110
111
# File 'lib/vertica/connection.rb', line 109

def interruptable?
  !session_id.nil?
end

#on_notice(&block) ⇒ Object



29
30
31
# File 'lib/vertica/connection.rb', line 29

def on_notice(&block)
  @notice_handler = block
end

#opened?Boolean

Returns:

  • (Boolean)


56
57
58
# File 'lib/vertica/connection.rb', line 56

def opened?
  @socket && @backend_pid && @transaction_status
end

#process_message(message) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/vertica/connection.rb', line 122

def process_message(message)
  case message
  when Vertica::Messages::ErrorResponse
    raise Vertica::Error::ConnectionError.new(message.error_message)
  when Vertica::Messages::NoticeResponse
    @notice_handler.call(message) if @notice_handler
  when Vertica::Messages::BackendKeyData
    @backend_pid = message.pid
    @backend_key = message.key
  when Vertica::Messages::ParameterStatus
    @parameters[message.name] = message.value
  when Vertica::Messages::ReadyForQuery
    @transaction_status = message.transaction_status
    @ready_for_query = true
  else
    raise Vertica::Error::MessageError, "Unhandled message: #{message.inspect}"
  end
end

#query(sql, options = {}, &block) ⇒ Object



147
148
149
150
151
152
153
# File 'lib/vertica/connection.rb', line 147

def query(sql, options = {}, &block)
  with_lock do
    job = Vertica::Query.new(self, sql, { :row_style => @row_style }.merge(options))
    job.row_handler = block if block_given?
    job.run
  end
end

#read_messageObject



113
114
115
116
117
118
119
120
# File 'lib/vertica/connection.rb', line 113

def read_message
  type = read_bytes(1)
  size = read_bytes(4).unpack('N').first
  raise Vertica::Error::MessageError.new("Bad message size: #{size}.") unless size >= 4
  message = Vertica::Messages::BackendMessage.factory type, read_bytes(size - 4)
  puts "<= #{message.inspect}" if @debug
  return message
end

#ready_for_query?Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/vertica/connection.rb', line 68

def ready_for_query?
  @ready_for_query == true
end

#reset_connectionObject



87
88
89
90
91
92
# File 'lib/vertica/connection.rb', line 87

def reset_connection
  close

  startup_connection
  initialize_connection
end

#socketObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/vertica/connection.rb', line 33

def socket
  @socket ||= begin
    raw_socket = TCPSocket.new(@options[:host], @options[:port].to_i)
    if @options[:ssl]
      require 'openssl'
      raw_socket.write Vertica::Messages::SslRequest.new.to_bytes
      if raw_socket.read(1) == 'S'
        raw_socket = OpenSSL::SSL::SSLSocket.new(raw_socket, OpenSSL::SSL::SSLContext.new)
        raw_socket.sync = true
        raw_socket.connect
      else
        raise Vertica::Error::ConnectionError.new("SSL requested but server doesn't support it.")
      end
    end
    
    raw_socket
  end
end

#ssl?Boolean

Returns:

  • (Boolean)


52
53
54
# File 'lib/vertica/connection.rb', line 52

def ssl?
  Object.const_defined?('OpenSSL') && socket.kind_of?(OpenSSL::SSL::SSLSocket)
end

#with_lock(&block) ⇒ Object



141
142
143
144
145
# File 'lib/vertica/connection.rb', line 141

def with_lock(&block)
  raise Vertica::Error::SynchronizeError, "The connection is in use!" if busy?
  @ready_for_query = false
  yield
end

#write(message) ⇒ Object

Raises:

  • (ArgumentError)


72
73
74
75
76
# File 'lib/vertica/connection.rb', line 72

def write(message)
  raise ArgumentError, "invalid message: (#{message.inspect})" unless message.respond_to?(:to_bytes)
  puts "=> #{message.inspect}" if @debug
  socket.write message.to_bytes
end