Class: Embulk::Output::Bigquery::FileWriter
- Inherits:
-
Object
- Object
- Embulk::Output::Bigquery::FileWriter
- Defined in:
- lib/embulk/output/bigquery/file_writer.rb
Instance Attribute Summary collapse
-
#num_rows ⇒ Object
readonly
Returns the value of attribute num_rows.
Instance Method Summary collapse
- #add(page) ⇒ Object
- #close ⇒ Object
-
#initialize(task, schema, index, converters = nil) ⇒ FileWriter
constructor
A new instance of FileWriter.
- #io ⇒ Object
- #num_format(number) ⇒ Object
- #open(path, mode = 'w') ⇒ Object
- #reopen ⇒ Object
- #to_csv(record) ⇒ Object
- #to_jsonl(record) ⇒ Object
- #to_payload(record) ⇒ Object
Constructor Details
#initialize(task, schema, index, converters = nil) ⇒ FileWriter
Returns a new instance of FileWriter.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 12 def initialize(task, schema, index, converters = nil) @task = task @schema = schema @index = index @converters = converters || ValueConverterFactory.create_converters(task, schema) @num_rows = 0 if @task['progress_log_interval'] @progress_log_interval = @task['progress_log_interval'] @progress_log_timer = Time.now @previous_num_rows = 0 end if @task['payload_column_index'] @payload_column_index = @task['payload_column_index'] @formatter_proc = self.method(:to_payload) else case @task['source_format'].downcase when 'csv' @formatter_proc = self.method(:to_csv) else @formatter_proc = self.method(:to_jsonl) end end end |
Instance Attribute Details
#num_rows ⇒ Object (readonly)
Returns the value of attribute num_rows.
10 11 12 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 10 def num_rows @num_rows end |
Instance Method Details
#add(page) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 98 def add(page) _io = io # I once tried to split IO writing into another IO thread using SizedQueue # However, it resulted in worse performance, so I removed the codes. page.each do |record| Embulk.logger.trace { "embulk-output-bigquery: record #{record}" } formatted_record = @formatter_proc.call(record) Embulk.logger.trace { "embulk-output-bigquery: formatted_record #{formatted_record.chomp}" } _io.write formatted_record @num_rows += 1 end show_progress if @task['progress_log_interval'] @num_rows end |
#close ⇒ Object
65 66 67 68 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 65 def close io.close rescue nil io end |
#io ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 38 def io return @io if @io path = sprintf( "#{@task['path_prefix']}#{@task['sequence_format']}#{@task['file_ext']}", Process.pid, Thread.current.object_id ) if File.exist?(path) Embulk.logger.warn { "embulk-output-bigquery: unlink already existing #{path}" } File.unlink(path) rescue nil end Embulk.logger.info { "embulk-output-bigquery: create #{path}" } @io = open(path, 'w') end |
#num_format(number) ⇒ Object
94 95 96 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 94 def num_format(number) number.to_s.gsub(/(\d)(?=(\d{3})+(?!\d))/, '\1,') end |
#open(path, mode = 'w') ⇒ Object
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 54 def open(path, mode = 'w') file_io = File.open(path, mode) case @task['compression'].downcase when 'gzip' io = Zlib::GzipWriter.new(file_io) else io = file_io end io end |
#reopen ⇒ Object
70 71 72 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 70 def reopen @io = open(io.path, 'a') end |
#to_csv(record) ⇒ Object
78 79 80 81 82 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 78 def to_csv(record) record.map.with_index do |value, column_index| @converters[column_index].call(value) end.to_csv end |
#to_jsonl(record) ⇒ Object
84 85 86 87 88 89 90 91 92 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 84 def to_jsonl(record) hash = {} column_names = @schema.names record.each_with_index do |value, column_index| column_name = column_names[column_index] hash[column_name] = @converters[column_index].call(value) end "#{hash.to_json}\n" end |
#to_payload(record) ⇒ Object
74 75 76 |
# File 'lib/embulk/output/bigquery/file_writer.rb', line 74 def to_payload(record) "#{record[@payload_column_index]}\n" end |