Class: Embulk::Output::Vertica
- Inherits:
-
OutputPlugin
- Object
- OutputPlugin
- Embulk::Output::Vertica
- Defined in:
- lib/embulk/output/vertica.rb,
lib/embulk/output/vertica/output_thread.rb,
lib/embulk/output/vertica/value_converter_factory.rb
Defined Under Namespace
Classes: Error, NotSupportedType, OutputThread, OutputThreadPool, ValueConverterFactory
Class Method Summary collapse
Instance Method Summary collapse
- #abort ⇒ Object
-
#add(page) ⇒ Object
called for each page in each thread.
-
#close ⇒ Object
called for each page in each thread.
-
#commit ⇒ Object
called after processing all pages in each thread we do commit on #transaction for all pools, not at here.
- #finish ⇒ Object
-
#initialize(task, schema, index) ⇒ Vertica
constructor
instance is created on each thread.
Constructor Details
#initialize(task, schema, index) ⇒ Vertica
instance is created on each thread
122 123 124 |
# File 'lib/embulk/output/vertica.rb', line 122 def initialize(task, schema, index) super end |
Class Method Details
.thread_pool ⇒ Object
13 14 15 |
# File 'lib/embulk/output/vertica.rb', line 13 def self.thread_pool @thread_pool ||= @thread_pool_proc.call end |
.transaction(config, schema, processor_count, &control) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/embulk/output/vertica.rb', line 17 def self.transaction(config, schema, processor_count, &control) task = { 'host' => config.param('host', :string, :default => 'localhost'), 'port' => config.param('port', :integer, :default => 5433), 'user' => config.param('user', :string, :default => nil), 'username' => config.param('username', :string, :default => nil), # alias to :user for backward compatibility 'password' => config.param('password', :string, :default => ''), 'database' => config.param('database', :string, :default => 'vdb'), 'schema' => config.param('schema', :string, :default => 'public'), 'table' => config.param('table', :string), 'mode' => config.param('mode', :string, :default => 'insert'), 'copy_mode' => config.param('copy_mode', :string, :default => 'AUTO'), 'abort_on_error' => config.param('abort_on_error', :bool, :default => false), 'compress' => config.param('compress', :string, :default => 'UNCOMPRESSED'), 'default_timezone' => config.param('default_timezone', :string, :default => 'UTC'), 'column_options' => config.param('column_options', :hash, :default => {}), 'json_payload' => config.param('json_payload', :bool, :default => false), 'resource_pool' => config.param('resource_pool', :string, :default => nil), 'reject_on_materialized_type_error' => config.param('reject_on_materialized_type_error', :bool, :default => false), 'pool' => config.param('pool', :integer, :default => processor_count), } @thread_pool_proc = Proc.new do OutputThreadPool.new(task, schema, task['pool']) end task['user'] ||= task['username'] unless task['user'] raise ConfigError.new 'required field "user" is not set' end task['mode'] = task['mode'].upcase unless %w[INSERT REPLACE].include?(task['mode']) raise ConfigError.new "`mode` must be one of INSERT, REPLACE" end task['copy_mode'] = task['copy_mode'].upcase unless %w[AUTO DIRECT TRICKLE].include?(task['copy_mode']) raise ConfigError.new "`copy_mode` must be one of AUTO, DIRECT, TRICKLE" end # ToDo: Support BZIP, LZO task['compress'] = task['compress'].upcase unless %w[GZIP UNCOMPRESSED].include?(task['compress']) raise ConfigError.new "`compress` must be one of GZIP, UNCOMPRESSED" end now = Time.now unique_name = "%08x%08x" % [now.tv_sec, now.tv_nsec] task['temp_table'] = "#{task['table']}_LOAD_TEMP_#{unique_name}" quoted_schema = ::Jvertica.quote_identifier(task['schema']) quoted_table = ::Jvertica.quote_identifier(task['table']) quoted_temp_table = ::Jvertica.quote_identifier(task['temp_table']) connect(task) do |jv| unless task['json_payload'] # ToDo: auto table creation is not supported to json_payload mode yet sql_schema_table = self.sql_schema_from_embulk_schema(schema, task['column_options']) # create the target table query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_table}]) if task['mode'] == 'REPLACE' query(jv, %[CREATE TABLE IF NOT EXISTS #{quoted_schema}.#{quoted_table} (#{sql_schema_table})]) end sql_schema_temp_table = self.sql_schema_from_table(jv, task) # create a temp table query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_temp_table}]) query(jv, %[CREATE TABLE #{quoted_schema}.#{quoted_temp_table} (#{sql_schema_temp_table})]) # Create internal vertica projection beforehand, otherwirse, parallel copies lock table to create a projection and we get S Lock error sometimes # This is a trick to create internal vertica projection query(jv, %[INSERT INTO #{quoted_schema}.#{quoted_temp_table} SELECT * FROM #{quoted_schema}.#{quoted_table} LIMIT 0]) Embulk.logger.trace { result = query(jv, %[SELECT EXPORT_OBJECTS('', '#{task['schema']}.#{task['temp_table']}')]) # You can see `CREATE PROJECTION` if the table has a projection "embulk-output-vertica: #{result.to_a.flatten}" } end begin # insert data into the temp table thread_pool.start yield(task) task_reports = thread_pool.commit Embulk.logger.info { "embulk-output-vertica: task_reports: #{task_reports.to_json}" } # insert select from the temp table connect(task) do |jv| hint = '/*+ direct */ ' if task['copy_mode'] == 'DIRECT' # I did not prepare a specific option, does anyone want? query(jv, %[INSERT #{hint}INTO #{quoted_schema}.#{quoted_table} SELECT * FROM #{quoted_schema}.#{quoted_temp_table}]) jv.commit end ensure connect(task) do |jv| # clean up the temp table query(jv, %[DROP TABLE IF EXISTS #{quoted_schema}.#{quoted_temp_table}]) Embulk.logger.trace { "embulk-output-vertica: select result\n#{query(jv, %[SELECT * FROM #{quoted_schema}.#{quoted_table} LIMIT 10]).map {|row| row.to_h }.join("\n") rescue nil}" } end end # this is for -o next_config option, add some paramters for next time execution if wants next_config_diff = {} return next_config_diff end |
Instance Method Details
#abort ⇒ Object
138 139 |
# File 'lib/embulk/output/vertica.rb', line 138 def abort end |
#add(page) ⇒ Object
called for each page in each thread
131 132 133 |
# File 'lib/embulk/output/vertica.rb', line 131 def add(page) self.class.thread_pool.enqueue(page) end |
#close ⇒ Object
called for each page in each thread
127 128 |
# File 'lib/embulk/output/vertica.rb', line 127 def close end |
#commit ⇒ Object
called after processing all pages in each thread we do commit on #transaction for all pools, not at here
143 144 145 |
# File 'lib/embulk/output/vertica.rb', line 143 def commit {} end |
#finish ⇒ Object
135 136 |
# File 'lib/embulk/output/vertica.rb', line 135 def finish end |