Class: Fluent::RedshiftOutput::RedshiftConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_redshift-out.rb

Constant Summary collapse

REDSHIFT_CONNECT_TIMEOUT =

10sec

10.0

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(db_conf) ⇒ RedshiftConnection

Returns a new instance of RedshiftConnection.



290
291
292
293
294
295
296
# File 'lib/fluent/plugin/out_redshift-out.rb', line 290

def initialize(db_conf)
  @db_conf = db_conf
  @connection = nil
  ObjectSpace.define_finalizer(self) {
    close()
  } 
end

Instance Attribute Details

#db_confObject (readonly)

Returns the value of attribute db_conf.



298
299
300
# File 'lib/fluent/plugin/out_redshift-out.rb', line 298

def db_conf
  @db_conf
end

Instance Method Details

#closeObject



326
327
328
329
# File 'lib/fluent/plugin/out_redshift-out.rb', line 326

def close
  @connection.close rescue nil if @connection
  @connection = nil
end

#connect_startObject



322
323
324
# File 'lib/fluent/plugin/out_redshift-out.rb', line 322

def connect_start
  @connection = create_redshift_connection
end

#exec(sql, &block) ⇒ Object



308
309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/fluent/plugin/out_redshift-out.rb', line 308

def exec(sql, &block)
  conn = @connection
  conn = create_redshift_connection if conn.nil?
  if block
    conn.exec(sql) {|result| block.call(result)}
  else
    conn.exec(sql)
  end
rescue PG::Error => e
  raise RedshiftError.new(e)
ensure
  conn.close if conn && @connection.nil?
end

#fetch_table_columns(table_name, schema_name) ⇒ Object



300
301
302
303
304
305
306
# File 'lib/fluent/plugin/out_redshift-out.rb', line 300

def fetch_table_columns(table_name, schema_name)
  columns = nil
  exec(fetch_columns_sql(table_name, schema_name)) do |result|
    columns = result.collect{|row| row['column_name']}
  end
  columns
end