Class: Fluent::SQLOutput::TableElement

Inherits:
Object
  • Object
show all
Includes:
Configurable
Defined in:
lib/fluent/plugin/out_sql.rb

Overview

TODO: Merge SQLInput’s TableElement

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pattern, log) ⇒ TableElement

Returns a new instance of TableElement.



34
35
36
37
38
# File 'lib/fluent/plugin/out_sql.rb', line 34

def initialize(pattern, log)
  super()
  @pattern = MatchPattern.create(pattern)
  @log = log
end

Instance Attribute Details

#modelObject (readonly)

Returns the value of attribute model.



31
32
33
# File 'lib/fluent/plugin/out_sql.rb', line 31

def model
  @model
end

#patternObject (readonly)

Returns the value of attribute pattern.



32
33
34
# File 'lib/fluent/plugin/out_sql.rb', line 32

def pattern
  @pattern
end

Instance Method Details

#configure(conf) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/out_sql.rb', line 40

def configure(conf)
  super

  @mapping = parse_column_mapping(@column_mapping)
  @format_proc = Proc.new { |record|
    new_record = {}
    @mapping.each { |k, c|
      new_record[c] = record[k]
    }
    new_record
  }
end

#import(chunk) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/out_sql.rb', line 70

def import(chunk)
  records = []
  chunk.msgpack_each { |tag, time, data|
    begin
      # format process should be moved to emit / format after supports error stream.
      records << @model.new(@format_proc.call(data))
    rescue => e
      args = {:error => e.message, :error_class => e.class, :table => @table, :record => Yajl.dump(data)}
      @log.warn "Failed to create the model. Ignore a record:", args
    end
  }
  begin
    @model.import(records)
  rescue ActiveRecord::StatementInvalid, ActiveRecord::ThrowResult, ActiveRecord::Import::MissingColumnError => e
    # ignore other exceptions to use Fluentd retry mechanizm
    @log.warn "Got deterministic error. Fallback to one-by-one import", :error => e.message, :error_class => e.class
    one_by_one_import(records)
  end
end

#init(base_model) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/out_sql.rb', line 53

def init(base_model)
  # See SQLInput for more details of following code
  table_name = @table
  @model = Class.new(base_model) do
    self.table_name = table_name
    self.inheritance_column = '_never_use_output_'
  end

  class_name = table_name.singularize.camelize
  base_model.const_set(class_name, @model)
  model_name = ActiveModel::Name.new(@model, nil, class_name)
  @model.define_singleton_method(:model_name) { model_name }

  # TODO: check column_names and table schema
  columns = @model.columns.map { |column| column.name }.sort
end

#one_by_one_import(records) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/fluent/plugin/out_sql.rb', line 90

def one_by_one_import(records)
  records.each { |record|
    retries = 0
    begin
      @model.import([record])
    rescue ActiveRecord::StatementInvalid, ActiveRecord::ThrowResult, ActiveRecord::Import::MissingColumnError => e
      @log.error "Got deterministic error again. Dump a record", :error => e.message, :error_class => e.class, :record => record
    rescue => e
      retries += 1
      if retries > @num_retries
        @log.error "Can't recover undeterministic error. Dump a record", :error => e.message, :error_class => e.class, :record => record
        next
      end

      @log.warn "Failed to import a record: retry number = #{retries}", :error  => e.message, :error_class => e.class
      sleep 0.5
      retry
    end
  }
end