Class: Fluent::SQLOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::SQLOutput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_sql.rb
Defined Under Namespace
Classes: TableElement
Instance Attribute Summary collapse
-
#tables ⇒ Object
Returns the value of attribute tables.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #desc(description) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ SQLOutput
constructor
A new instance of SQLOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ SQLOutput
Returns a new instance of SQLOutput.
140 141 142 143 144 |
# File 'lib/fluent/plugin/out_sql.rb', line 140 def initialize super require 'active_record' require 'activerecord-import' end |
Instance Attribute Details
#tables ⇒ Object
Returns the value of attribute tables.
33 34 35 |
# File 'lib/fluent/plugin/out_sql.rb', line 33 def tables @tables end |
Instance Method Details
#configure(conf) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/fluent/plugin/out_sql.rb', line 146 def configure(conf) super if remove_tag_prefix = conf['remove_tag_prefix'] @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) end @tables = [] @default_table = nil conf.elements.select { |e| e.name == 'table' }.each { |e| te = TableElement.new(e.arg, log) te.configure(e) if e.arg.empty? $log.warn "Detect duplicate default table definition" if @default_table @default_table = te else @tables << te end } @only_default = @tables.empty? if @default_table.nil? raise ConfigError, "There is no default table. <table> is required in sql output" end end |
#desc(description) ⇒ Object
11 12 |
# File 'lib/fluent/plugin/out_sql.rb', line 11 def desc(description) end |
#emit(tag, es, chain) ⇒ Object
205 206 207 208 209 210 211 |
# File 'lib/fluent/plugin/out_sql.rb', line 205 def emit(tag, es, chain) if @only_default super(tag, es, chain) else super(tag, es, chain, format_tag(tag)) end end |
#format(tag, time, record) ⇒ Object
213 214 215 |
# File 'lib/fluent/plugin/out_sql.rb', line 213 def format(tag, time, record) [tag, time, record].to_msgpack end |
#shutdown ⇒ Object
201 202 203 |
# File 'lib/fluent/plugin/out_sql.rb', line 201 def shutdown super end |
#start ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/fluent/plugin/out_sql.rb', line 174 def start super config = { :adapter => @adapter, :host => @host, :port => @port, :database => @database, :username => @username, :password => @password, :socket => @socket, } @base_model = Class.new(ActiveRecord::Base) do self.abstract_class = true end SQLOutput.const_set("BaseModel_#{rand(1 << 31)}", @base_model) @base_model.establish_connection(config) # ignore tables if TableElement#init failed @tables.reject! do |te| init_table(te, @base_model) end init_table(@default_table, @base_model) end |
#write(chunk) ⇒ Object
217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/fluent/plugin/out_sql.rb', line 217 def write(chunk) conn = @base_model.connection conn.active? || conn.reconnect! @tables.each { |table| if table.pattern.match(chunk.key) return table.import(chunk) end } @default_table.import(chunk) end |