Class: Fluent::PgdistOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::PgdistOutput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_pgdist.rb
Constant Summary collapse
- DB_ESCAPE_PATTERN =
Regexp.new("[\\\\\\a\\b\\n\\r\\t]")
Instance Attribute Summary collapse
-
#handler ⇒ Object
Returns the value of attribute handler.
Instance Method Summary collapse
- #client ⇒ Object
- #configure(conf) ⇒ Object
- #db_escape(str) ⇒ Object
- #delete_duplicative_records(records) ⇒ Object
- #delete_existing_records(handler, table, records) ⇒ Object
- #file_path(table) ⇒ Object
- #filter_for_file_record(*args) ⇒ Object
- #filter_for_insert(record) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ PgdistOutput
constructor
A new instance of PgdistOutput.
- #insert_into_db(handler, table, records) ⇒ Object
- #read_last_sequence(filepath) ⇒ Object
- #read_last_sequence_from_file(handler, table, filepath) ⇒ Object
- #sequence_path(table) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #table_name(record) ⇒ Object
- #write(chunk) ⇒ Object
- #write_pg_result(output_stream, fields, pg_result) ⇒ Object
- #write_to_file(handler, table) ⇒ Object
Constructor Details
#initialize ⇒ PgdistOutput
Returns a new instance of PgdistOutput.
134 135 136 137 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 134 def initialize super require 'pg' end |
Instance Attribute Details
#handler ⇒ Object
Returns the value of attribute handler.
30 31 32 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 30 def handler @handler end |
Instance Method Details
#client ⇒ Object
32 33 34 35 36 37 38 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 32 def client PG::Connection.new({ :host => @host, :port => @port, :user => @username, :password => @password, :dbname => @database }) end |
#configure(conf) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 40 def configure(conf) super @insert_columns ||= @columns @insert_values ||= @values if @insert_columns.nil? raise fluent::configerror, "columns must be specified, but missing" end @table_moniker_lambda = eval("lambda#{@table_moniker}") @insert_filter_lambda = eval("lambda#{@insert_filter}") if @file_moniker @file_moniker_lambda = eval("lambda#{@file_moniker}") @sequence_moniker ||= '{|table|"/tmp/#{table}.seq"}' case @file_format when "json" || "msgpack" || "message_pack" @file_record_filter ||= '{|record|record}' when "ltsv" @file_record_filter ||= '{|fields,record|[fields,record]}' else end end @file_record_filter_lambda = eval("lambda#{@file_record_filter}") if @file_record_filter @sequence_moniker_lambda = eval("lambda#{@sequence_moniker}") if @sequence_moniker self end |
#db_escape(str) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 70 def db_escape(str) return "\\N" if str.nil? rest = str ret = '' while match_data = DB_ESCAPE_PATTERN.match(rest) ret += match_data.pre_match code = match_data[0] rest = match_data.post_match case code when '\\' ret += '\\\\' when "\a" ret += "\\a" when "\b" ret += "\\b" when "\n" ret += "\\n" when "\r" ret += "\\r" when "\t" ret += "\\t" end end return ret + rest end |
#delete_duplicative_records(records) ⇒ Object
96 97 98 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 96 def delete_duplicative_records(records) records.uniq!{|r|r[@unique_column]} end |
#delete_existing_records(handler, table, records) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 100 def delete_existing_records(handler, table, records) unique_values = records.map{|r|r[@unique_column]} if unique_values != [] where_sql = "where " + 1.upto(unique_values.size).map{|i|"#{@unique_column} = \$#{i}"}.join(" or ") handler.prepare("select_#{table}", "select #{@unique_column} from #{table} #{where_sql}") result = handler.exec_prepared("select_#{table}", unique_values) exist_values = result.column_values(0) return if exist_values.size == 0 $log.info "delete #{exist_values.size} duplicative records for #{table}" records.reject!{|r|exist_values.include?(r[@unique_column])} end end |
#file_path(table) ⇒ Object
113 114 115 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 113 def file_path(table) @file_moniker_lambda.call(table) end |
#filter_for_file_record(*args) ⇒ Object
117 118 119 120 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 117 def filter_for_file_record(*args) result = @file_record_filter_lambda.call(*args) return result end |
#filter_for_insert(record) ⇒ Object
122 123 124 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 122 def filter_for_insert(record) @insert_filter_lambda.call(record) end |
#format(tag, time, record) ⇒ Object
130 131 132 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 130 def format(tag, time, record) [tag, time, record].to_msgpack end |
#insert_into_db(handler, table, records) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 139 def insert_into_db(handler, table, records) if @unique_column delete_duplicative_records(records) delete_existing_records(handler, table, records) end $log.info "insert #{records.size} records into #{table}" sql = "INSERT INTO #{table}(#{@insert_columns}) VALUES(#{@insert_values})" $log.info "execute sql #{sql.inspect}" statement = "write_#{table}" handler.prepare(statement, sql) records.each do |record| record = filter_for_insert(record) $log.info "insert #{record.inspect}" begin handler.exec_prepared(statement, record) rescue Exception=>e if @raise_exception raise e else $log.info e. end end end end |
#read_last_sequence(filepath) ⇒ Object
165 166 167 168 169 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 165 def read_last_sequence(filepath) last_sequence = File.read(filepath).chomp last_sequence = nil if /-?[0-9]+/ !~ last_sequence return last_sequence end |
#read_last_sequence_from_file(handler, table, filepath) ⇒ Object
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 171 def read_last_sequence_from_file(handler, table, filepath) last_line = `tail -n 1 #{filepath}` case @file_format when "json" last_record = JSON.parse(last_line) last_sequence = last_record[@sequence_column] when "ltsv" last_record = Hash[last_line.split(/\t/).map{|p|p.split(/:/, 2)}] last_sequence = last_record[@sequence_column] else result = handler.exec("select * from #{table} limit 0") fields = result.fields sequence_index = fields.index(@sequence_column) last_record = last_line.split(/\t/) last_sequence = last_record[sequence_index] end last_sequence = nil if /-?[0-9]+/ !~ last_sequence return last_sequence end |
#sequence_path(table) ⇒ Object
126 127 128 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 126 def sequence_path(table) @sequence_moniker_lambda.call(table) end |
#shutdown ⇒ Object
191 192 193 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 191 def shutdown super end |
#start ⇒ Object
195 196 197 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 195 def start super end |
#table_name(record) ⇒ Object
199 200 201 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 199 def table_name(record) @table_moniker_lambda.call(record) end |
#write(chunk) ⇒ Object
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 203 def write(chunk) handler = self.client records_hash = {} chunk.msgpack_each { |tag, time, data| table = @table_moniker_lambda.call(data) if ! table.nil? records_hash[table] ||= [] records_hash[table].push data end } records_hash.each_pair do |table, records| insert_into_db(handler, table, records) write_to_file(handler, table) if @file_moniker end handler.close end |
#write_pg_result(output_stream, fields, pg_result) ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 220 def write_pg_result(output_stream, fields, pg_result) case @file_format when "json" pg_result.each do |tuple| tuple = filter_for_file_record(tuple) output_stream.puts(tuple.to_json) end when "ltsv" pg_result.each_row do |row| fields, row = filter_for_file_record(fields, row) output_stream.puts(fields.each_with_index.map{|f,i|"#{f}:#{db_escape(row[i])}"}.join("\t")) end when "msgpack" || "message_pack" pg_result.each do |tuple| tuple = filter_for_file_record(tuple) output_stream.write(tuple.to_msgpack) end else pg_result.each_row do |row| output_stream.puts(row.map{|v|db_escape(v)}.join("\t")) end end end |
#write_to_file(handler, table) ⇒ Object
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/fluent/plugin/out_pgdist.rb', line 244 def write_to_file(handler, table) sequence_file_path = sequence_path(table) last_sequence = read_last_sequence(sequence_file_path) if File.exists?(sequence_file_path) file = nil while true where_sql = "where #{last_sequence} < #{@sequence_column}" if last_sequence result = handler.exec("select * from #{table} #{where_sql} order by #{@sequence_column} limit #{@file_write_limit}") result_size = result.ntuples break if result_size == 0 fields ||= result.fields file ||= File.open(file_path(table), "a") write_pg_result(file, fields, result) last_sequence = result[result_size-1][@sequence_column] break if result_size < @file_write_limit end if file file.close File.write(sequence_file_path, last_sequence.to_s) if last_sequence end end |