Class: Fluent::SQLOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_sql.rb

Defined Under Namespace

Classes: TableElement

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSQLOutput

Returns a new instance of SQLOutput.



140
141
142
143
144
# File 'lib/fluent/plugin/out_sql.rb', line 140

def initialize
  super
  require 'active_record'
  require 'activerecord-import'
end

Instance Attribute Details

#tablesObject

Returns the value of attribute tables.



33
34
35
# File 'lib/fluent/plugin/out_sql.rb', line 33

def tables
  @tables
end

Instance Method Details

#configure(conf) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/fluent/plugin/out_sql.rb', line 146

def configure(conf)
  super

  if remove_tag_prefix = conf['remove_tag_prefix']
    @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
  end

  @tables = []
  @default_table = nil
  conf.elements.select { |e|
    e.name == 'table'
  }.each { |e|
    te = TableElement.new(e.arg, log)
    te.configure(e)
    if e.arg.empty?
      $log.warn "Detect duplicate default table definition" if @default_table
      @default_table = te
    else
      @tables << te
    end
  }
  @only_default = @tables.empty?

  if @default_table.nil?
    raise ConfigError, "There is no default table. <table> is required in sql output"
  end
end

#desc(description) ⇒ Object



11
12
# File 'lib/fluent/plugin/out_sql.rb', line 11

def desc(description)
end

#emit(tag, es, chain) ⇒ Object



205
206
207
208
209
210
211
# File 'lib/fluent/plugin/out_sql.rb', line 205

def emit(tag, es, chain)
  if @only_default
    super(tag, es, chain)
  else
    super(tag, es, chain, format_tag(tag))
  end
end

#format(tag, time, record) ⇒ Object



213
214
215
# File 'lib/fluent/plugin/out_sql.rb', line 213

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#shutdownObject



201
202
203
# File 'lib/fluent/plugin/out_sql.rb', line 201

def shutdown
  super
end

#startObject



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/fluent/plugin/out_sql.rb', line 174

def start
  super

  config = {
    :adapter => @adapter,
    :host => @host,
    :port => @port,
    :database => @database,
    :username => @username,
    :password => @password,
    :socket => @socket,
  }

  @base_model = Class.new(ActiveRecord::Base) do
    self.abstract_class = true
  end

  SQLOutput.const_set("BaseModel_#{rand(1 << 31)}", @base_model)
  @base_model.establish_connection(config)

  # ignore tables if TableElement#init failed
  @tables.reject! do |te|
    init_table(te, @base_model)
  end
  init_table(@default_table, @base_model)
end

#write(chunk) ⇒ Object



217
218
219
220
221
222
223
224
225
226
227
# File 'lib/fluent/plugin/out_sql.rb', line 217

def write(chunk)
  conn = @base_model.connection
  conn.active? || conn.reconnect!

  @tables.each { |table|
    if table.pattern.match(chunk.key)
      return table.import(chunk)
    end
  }
  @default_table.import(chunk)
end