Class: Fluent::MysqlLoadOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_mysql_load.rb

Constant Summary collapse

QUERY_TEMPLATE =
"LOAD DATA LOCAL INFILE '%s' INTO TABLE %s (%s)"

Instance Method Summary collapse

Constructor Details

#initializeMysqlLoadOutput

Returns a new instance of MysqlLoadOutput.



12
13
14
15
16
# File 'lib/fluent/plugin/out_mysql_load.rb', line 12

def initialize
  require 'mysql2'
  require 'tempfile'
  super
end

Instance Method Details

#configure(conf) ⇒ Object



27
28
29
30
31
32
33
34
35
36
# File 'lib/fluent/plugin/out_mysql_load.rb', line 27

def configure(conf)
  super
  if @database.nil? || @tablename.nil?
    raise Fluent::ConfigError, "database and tablename is required!"
  end

  if (!@columns.nil?)
    @columns = @columns.split(",")
  end
end

#format(tag, time, record) ⇒ Object



46
47
48
# File 'lib/fluent/plugin/out_mysql_load.rb', line 46

def format(tag, time, record)
  record.to_msgpack
end

#shutdownObject



42
43
44
# File 'lib/fluent/plugin/out_mysql_load.rb', line 42

def shutdown
  super
end

#startObject



38
39
40
# File 'lib/fluent/plugin/out_mysql_load.rb', line 38

def start
  super
end

#write(chunk) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/out_mysql_load.rb', line 50

def write(chunk)
  tmp = Tempfile.new("loaddata")
  keys = nil
  chunk.msgpack_each { |record|
    # keyの取得は初回のみ
    if keys.nil?
      # columnsが指定されている場合はそっちを有効にする
      keys = @columns.nil? ? record.keys : @columns
    end

    values = []
    keys.each{ |key|
      values << record[key]
    }

    tmp.write values.join("\t") + "\n"
  }
  tmp.close

  query = QUERY_TEMPLATE % ([tmp.path, @tablename, keys.join(",")])

  conn = get_connection
  conn.query(query)
  conn.close
end