Class: Fluent::Plugin::MysqlFetchAndEmitOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_mysql_fetch_and_emit.rb

Instance Method Summary collapse

Instance Method Details

#client(database) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/fluent/plugin/out_mysql_fetch_and_emit.rb', line 183

def client(database)
  Mysql2::Client.new(
    host: @host,
    port: @port,
    username: @username,
    password: @password,
    database: database,
    sslkey: @sslkey,
    sslcert: @sslcert,
    sslca: @sslca,
    sslcapath: @sslcapath,
    sslcipher: @sslcipher,
    sslverify: @sslverify
  )
end

#configure(conf) ⇒ Object



84
85
86
87
88
89
# File 'lib/fluent/plugin/out_mysql_fetch_and_emit.rb', line 84

def configure(conf)
  super
  @accessor_for_record_key = record_accessor_create(@record_key)
  @accessors_for_record_matching = @record_matching_keys.map { |cf| record_accessor_create(cf.fluentd_record_key) }
  @column_names_for_record_matching = @record_matching_keys.map { |cf| cf.mysql_record_key }
end

#expand_placeholders(metadata) ⇒ Object



199
200
201
202
203
# File 'lib/fluent/plugin/out_mysql_fetch_and_emit.rb', line 199

def expand_placeholders()
  database = extract_placeholders(@database, ).gsub('.', '_')
  table = extract_placeholders(@table, ).gsub('.', '_')
  return database, table
end

#format(tag, time, record) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/fluent/plugin/out_mysql_fetch_and_emit.rb', line 91

def format(tag, time, record)
  value = @accessor_for_record_key.call(record)
  unless @accessors_for_record_matching.empty?
    keys_for_origin_record = @accessors_for_record_matching.map { |accessor| accessor.call(record) }
    if keys_for_origin_record.any?(&:nil?)
      @log.warn("Incoming record is omitted, because values for record matching include nil", record: record)
      return nil
    end
  end

  case value
  when String, Integer, Float
    [tag, time, record].to_msgpack
  else
    @log.warn("Incoming record is omitted, Supported value type of `record_key` is String, Integer, Float", record: record)
    nil
  end
end

#formatted_to_msgpack_binary?Boolean

Returns:

  • (Boolean)


110
111
112
# File 'lib/fluent/plugin/out_mysql_fetch_and_emit.rb', line 110

def formatted_to_msgpack_binary?
  true
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/fluent/plugin/out_mysql_fetch_and_emit.rb', line 114

def multi_workers_ready?
  true
end

#where_column_nameObject



118
119
120
# File 'lib/fluent/plugin/out_mysql_fetch_and_emit.rb', line 118

def where_column_name
  @where_column || @record_key
end

#write(chunk) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/fluent/plugin/out_mysql_fetch_and_emit.rb', line 122

def write(chunk)
  database, table = expand_placeholders(chunk.)
  @handler = client(database)
  where_values = []
  origin_records = {}
  chunk.msgpack_each do |tag, time, data|
    value = @accessor_for_record_key.call(data)
    case value
    when String
      where_values << "'" + Mysql2::Client.escape(value) + "'" if value
    when Integer, Float
      where_values << value.to_s if value
    else
      next
    end

    unless @accessors_for_record_matching.empty?
      keys_for_origin_record = @accessors_for_record_matching.map { |accessor| accessor.call(data) }
      parent = keys_for_origin_record[0..-2].inject(origin_records) do |h, v|
        h[v] ||= {}
      end
      parent[keys_for_origin_record.last] = data
    end
  end
  where_condition = "WHERE #{where_column_name} IN (#{where_values.join(',')})"

  if @additional_condition
    condition_sql = extract_placeholders(@additional_condition, chunk.)
    where_condition += " AND #{condition_sql}"
  end

  sql = "SELECT #{@column_names.join(", ")} FROM #{table} #{where_condition}"
  @log.debug(sql)
  results = @handler.query(sql, cast_booleans: @cast_booleans, stream: @stream)

  time = Fluent::EventTime.now
  results.each do |row|
    unless @column_names_for_record_matching.empty?
      record = @column_names_for_record_matching.inject(origin_records) do |h, k|
        if h
          h[row[k]]
        end
      end

      if record
        @remove_keys.each do |k|
          record.delete(k)
        end

        if @merge_priority == :mysql
          row = record.merge!(row)
        else
          row = row.merge!(record)
        end
      end
    end
    @log.debug("emit", tag: @tag, record: row)
    router.emit(@tag, time, row)
  end
end