Class: Fluent::PgdistOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_pgdist.rb

Constant Summary collapse

DB_ESCAPE_PATTERN =
Regexp.new("[\\\\\\a\\b\\n\\r\\t]")

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializePgdistOutput

Returns a new instance of PgdistOutput.



134
135
136
137
# File 'lib/fluent/plugin/out_pgdist.rb', line 134

def initialize
  super
  require 'pg'
end

Instance Attribute Details

#handlerObject

Returns the value of attribute handler.



30
31
32
# File 'lib/fluent/plugin/out_pgdist.rb', line 30

def handler
  @handler
end

Instance Method Details

#clientObject



32
33
34
35
36
37
38
# File 'lib/fluent/plugin/out_pgdist.rb', line 32

def client
  PG::Connection.new({
    :host => @host, :port => @port,
    :user => @username, :password => @password,
    :dbname => @database
  })
end

#configure(conf) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fluent/plugin/out_pgdist.rb', line 40

def configure(conf)
  super

  @insert_columns ||= @columns
  @insert_values ||= @values

  if @insert_columns.nil?
    raise fluent::configerror, "columns must be specified, but missing"
  end

  @table_moniker_lambda = eval("lambda#{@table_moniker}")
  @insert_filter_lambda = eval("lambda#{@insert_filter}")
  if @file_moniker
    @file_moniker_lambda = eval("lambda#{@file_moniker}")
    @sequence_moniker ||= '{|table|"/tmp/#{table}.seq"}'
    case @file_format
    when "json" || "msgpack" || "message_pack"
      @file_record_filter ||= '{|record|record}'
    when "ltsv"
      @file_record_filter ||= '{|fields,record|[fields,record]}'
    else
    end
  end
  @file_record_filter_lambda = eval("lambda#{@file_record_filter}") if @file_record_filter
  @sequence_moniker_lambda = eval("lambda#{@sequence_moniker}") if @sequence_moniker
  self
end

#db_escape(str) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fluent/plugin/out_pgdist.rb', line 70

def db_escape(str)
  return "\\N" if str.nil?
  rest = str
  ret = ''
  while match_data = DB_ESCAPE_PATTERN.match(rest)
    ret += match_data.pre_match
    code = match_data[0]
    rest = match_data.post_match
    case code
    when '\\'
      ret += '\\\\'
    when "\a"
      ret += "\\a"
    when "\b"
      ret += "\\b"
    when "\n"
      ret += "\\n"
    when "\r"
      ret += "\\r"
    when "\t"
      ret += "\\t"
    end
  end
  return ret + rest
end

#delete_duplicative_records(records) ⇒ Object



96
97
98
# File 'lib/fluent/plugin/out_pgdist.rb', line 96

def delete_duplicative_records(records)
  records.uniq!{|r|r[@unique_column]}
end

#delete_existing_records(handler, table, records) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/fluent/plugin/out_pgdist.rb', line 100

def delete_existing_records(handler, table, records)
  unique_values = records.map{|r|r[@unique_column]}
  if unique_values != []
    where_sql = "where " + 1.upto(unique_values.size).map{|i|"#{@unique_column} = \$#{i}"}.join(" or ")
    handler.prepare("select_#{table}", "select #{@unique_column} from #{table} #{where_sql}")
    result = handler.exec_prepared("select_#{table}", unique_values)
    exist_values = result.column_values(0)
    return if exist_values.size == 0
    $log.info "delete #{exist_values.size} duplicative records for #{table}"
    records.reject!{|r|exist_values.include?(r[@unique_column])}
  end
end

#file_path(table) ⇒ Object



113
114
115
# File 'lib/fluent/plugin/out_pgdist.rb', line 113

def file_path(table)
  @file_moniker_lambda.call(table)
end

#filter_for_file_record(*args) ⇒ Object



117
118
119
120
# File 'lib/fluent/plugin/out_pgdist.rb', line 117

def filter_for_file_record(*args)
  result = @file_record_filter_lambda.call(*args)
  return result
end

#filter_for_insert(record) ⇒ Object



122
123
124
# File 'lib/fluent/plugin/out_pgdist.rb', line 122

def filter_for_insert(record)
  @insert_filter_lambda.call(record)
end

#format(tag, time, record) ⇒ Object



130
131
132
# File 'lib/fluent/plugin/out_pgdist.rb', line 130

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#insert_into_db(handler, table, records) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/fluent/plugin/out_pgdist.rb', line 139

def insert_into_db(handler, table, records)
  if @unique_column
    delete_duplicative_records(records)
    delete_existing_records(handler, table, records)
  end

  $log.info "insert #{records.size} records into #{table}"
  sql = "INSERT INTO #{table}(#{@insert_columns}) VALUES(#{@insert_values})"
  $log.info "execute sql #{sql.inspect}"
  statement = "write_#{table}"
  handler.prepare(statement, sql)
  records.each do |record|
    record = filter_for_insert(record)
    $log.info "insert #{record.inspect}"
    begin
      handler.exec_prepared(statement, record)
    rescue Exception=>e
      if @raise_exception
        raise e
      else
        $log.info e.message
      end
    end
  end
end

#read_last_sequence(filepath) ⇒ Object



165
166
167
168
169
# File 'lib/fluent/plugin/out_pgdist.rb', line 165

def read_last_sequence(filepath)
  last_sequence =  File.read(filepath).chomp
  last_sequence = nil if /-?[0-9]+/ !~ last_sequence
  return last_sequence
end

#read_last_sequence_from_file(handler, table, filepath) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/fluent/plugin/out_pgdist.rb', line 171

def read_last_sequence_from_file(handler, table, filepath)
  last_line = `tail -n 1 #{filepath}`
  case @file_format
  when "json"
    last_record = JSON.parse(last_line)
    last_sequence = last_record[@sequence_column]
  when "ltsv"
    last_record = Hash[last_line.split(/\t/).map{|p|p.split(/:/, 2)}]
    last_sequence = last_record[@sequence_column]
  else
    result = handler.exec("select * from #{table} limit 0")
    fields = result.fields
    sequence_index = fields.index(@sequence_column)
    last_record = last_line.split(/\t/)
    last_sequence = last_record[sequence_index]
  end
  last_sequence = nil if /-?[0-9]+/ !~ last_sequence
  return last_sequence
end

#sequence_path(table) ⇒ Object



126
127
128
# File 'lib/fluent/plugin/out_pgdist.rb', line 126

def sequence_path(table)
  @sequence_moniker_lambda.call(table)
end

#shutdownObject



191
192
193
# File 'lib/fluent/plugin/out_pgdist.rb', line 191

def shutdown
  super
end

#startObject



195
196
197
# File 'lib/fluent/plugin/out_pgdist.rb', line 195

def start
  super
end

#table_name(record) ⇒ Object



199
200
201
# File 'lib/fluent/plugin/out_pgdist.rb', line 199

def table_name(record)
  @table_moniker_lambda.call(record)
end

#write(chunk) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/fluent/plugin/out_pgdist.rb', line 203

def write(chunk)
  handler = self.client
  records_hash = {}
  chunk.msgpack_each { |tag, time, data|
    table = @table_moniker_lambda.call(data)
    if ! table.nil?
      records_hash[table] ||= []
      records_hash[table].push data
    end
  }
  records_hash.each_pair do |table, records|
    insert_into_db(handler, table, records)
    write_to_file(handler, table) if @file_moniker
  end
  handler.close
end

#write_pg_result(output_stream, fields, pg_result) ⇒ Object



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/fluent/plugin/out_pgdist.rb', line 220

def write_pg_result(output_stream, fields, pg_result)
  case @file_format
  when "json"
    pg_result.each do |tuple|
      tuple = filter_for_file_record(tuple)
      output_stream.puts(tuple.to_json)
    end
  when "ltsv"
    pg_result.each_row do |row|
      fields, row = filter_for_file_record(fields, row)
      output_stream.puts(fields.each_with_index.map{|f,i|"#{f}:#{db_escape(row[i])}"}.join("\t"))
    end
  when "msgpack" || "message_pack"
    pg_result.each do |tuple|
      tuple = filter_for_file_record(tuple)
      output_stream.write(tuple.to_msgpack)
    end
  else
    pg_result.each_row do |row|
      output_stream.puts(row.map{|v|db_escape(v)}.join("\t"))
    end
  end
end

#write_to_file(handler, table) ⇒ Object



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/fluent/plugin/out_pgdist.rb', line 244

def write_to_file(handler, table)
  sequence_file_path = sequence_path(table)
  last_sequence = read_last_sequence(sequence_file_path) if File.exists?(sequence_file_path)
  file = nil
  while true
    where_sql = "where #{last_sequence} < #{@sequence_column}" if last_sequence
    result = handler.exec("select * from #{table} #{where_sql} order by #{@sequence_column} limit #{@file_write_limit}")
    result_size = result.ntuples
    break if result_size == 0

    fields ||= result.fields
    file ||= File.open(file_path(table), "a")
    write_pg_result(file, fields, result)
    last_sequence = result[result_size-1][@sequence_column]
    break if result_size < @file_write_limit
  end
  if file
    file.close
    File.write(sequence_file_path, last_sequence.to_s) if last_sequence
  end
end