Class: Fluent::Plugin::FileWithHeaderOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_file_with_header.rb

Defined Under Namespace

Modules: SymlinkBufferMixin

Constant Summary collapse

SUPPORTED_COMPRESS =
[:text, :gz, :gzip]
SUPPORTED_COMPRESS_MAP =
{
  text: nil,
  gz: :gzip,
  gzip: :gzip,
}
DEFAULT_TIMEKEY =
60 * 60 * 24

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#dir_permObject (readonly)

Returns the value of attribute dir_perm.



67
68
69
# File 'lib/fluent/plugin/out_file_with_header.rb', line 67

def dir_perm
  @dir_perm
end

#last_written_pathObject

for tests



68
69
70
# File 'lib/fluent/plugin/out_file_with_header.rb', line 68

def last_written_path
  @last_written_path
end

Instance Method Details

#compression_suffix(compress) ⇒ Object



266
267
268
269
270
271
272
273
# File 'lib/fluent/plugin/out_file_with_header.rb', line 266

def compression_suffix(compress)
  case compress
  when :gzip then '.gz'
  when nil then ''
  else
    raise ArgumentError, "unknown compression type #{compress}"
  end
end

#configure(conf) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/fluent/plugin/out_file_with_header.rb', line 105

def configure(conf)
  compat_parameters_convert(conf, :formatter, :buffer, :inject, default_chunk_key: "time")

  configured_time_slice_format = conf['time_slice_format']

  if conf.elements(name: 'buffer').empty?
    conf.add_element('buffer', 'time')
  end
  buffer_conf = conf.elements(name: 'buffer').first
  # Fluent::PluginId#configure is not called yet, so we can't use #plugin_root_dir here.
  if !buffer_conf.has_key?('path') && !(conf['@id'] && system_config.root_dir)
    # v0.14 file buffer handles path as directory if '*' is missing
    # 'dummy_path' is not to raise configuration error for 'path' in file buffer plugin,
    # but raise it in this plugin.
    buffer_conf['path'] = conf['path'] || '/tmp/dummy_path'
  end

  if conf.has_key?('utc') || conf.has_key?('localtime')
    param_name = conf.has_key?('utc') ? 'utc' : 'localtime'
    log.warn "'#{param_name}' is deprecated for output plugin. This parameter is used for formatter plugin in compatibility layer. If you want to use same feature, use timekey_use_utc parameter in <buffer> directive instead"
  end

  super

  @compress_method = SUPPORTED_COMPRESS_MAP[@compress]

  if @path.include?('*') && !@buffer_config.timekey
    raise Fluent::ConfigError, "path including '*' must be used with buffer chunk key 'time'"
  end

  path_suffix = @add_path_suffix ? @path_suffix : ''
  path_timekey = if @chunk_key_time
                   @as_secondary ? @primary_instance.buffer_config.timekey : @buffer_config.timekey
                 else
                   nil
                 end
  @path_template = generate_path_template(@path, path_timekey, @append, @compress_method, path_suffix: path_suffix, time_slice_format: configured_time_slice_format)

  if @as_secondary
    # When this plugin is configured as secondary & primary plugin has tag key, but this plugin may not have it.
    # Increment placeholder can make another output file per chunk tag/keys even if original path doesn't include it.
    placeholder_validators(:path, @path_template).select{|v| v.type == :time }.each do |v|
      v.validate!
    end
  else
    placeholder_validate!(:path, @path_template)

    max_tag_index = get_placeholders_tag(@path_template).max || 1
    max_tag_index = 1 if max_tag_index < 1
    dummy_tag = (['a'] * max_tag_index).join('.')
    dummy_record_keys = get_placeholders_keys(@path_template) || ['message']
    dummy_record = Hash[dummy_record_keys.zip(['data'] * dummy_record_keys.size)]

    test_chunk1 = chunk_for_test(dummy_tag, Fluent::EventTime.now, dummy_record)
    test_path = extract_placeholders(@path_template, test_chunk1)
    unless ::Fluent::FileUtil.writable_p?(test_path)
      raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable"
    end
  end

  @formatter = formatter_create

  if @symlink_path && @buffer.respond_to?(:path)
    if @as_secondary
      raise Fluent::ConfigError, "symlink_path option is unavailable in <secondary>: consider to use secondary_file plugin"
    end
    if Fluent.windows?
      log.warn "symlink_path is unavailable on Windows platform. disabled."
      @symlink_path = nil
    else
      @buffer.extend SymlinkBufferMixin
      @buffer.symlink_path = @symlink_path
      @buffer.output_plugin_for_symlink = self
    end
  end

  @dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION
  @file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION
  @need_lock = system_config.workers > 1
end

#find_filepath_available(path_with_placeholder, with_lock: false) ⇒ Object

for non-append



307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/fluent/plugin/out_file_with_header.rb', line 307

def find_filepath_available(path_with_placeholder, with_lock: false) # for non-append
  raise "BUG: index placeholder not found in path: #{path_with_placeholder}" unless path_with_placeholder.index('_**')
  i = 0
  dir_path = locked = nil
  while true
    path = path_with_placeholder.sub('_**', "_#{i}")
    i += 1
    next if File.exist?(path)

    if with_lock
      dir_path = path + '.lock'
      locked = Dir.mkdir(dir_path) rescue false
      next unless locked
      # ensure that other worker doesn't create a file (and release lock)
      # between previous File.exist? and Dir.mkdir
      next if File.exist?(path)
    end

    break
  end
  yield path
ensure
  if dir_path && locked && Dir.exist?(dir_path)
    Dir.rmdir(dir_path) rescue nil
  end
end

#format(tag, time, record) ⇒ Object



190
191
192
193
# File 'lib/fluent/plugin/out_file_with_header.rb', line 190

def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end

#generate_path_template(original, timekey, append, compress, path_suffix: '', time_slice_format: nil) ⇒ Object

/path/to/dir/file.* -> /path/to/dir/file.%Y%m%d /path/to/dir/file.*.data -> /path/to/dir/file.%Y%m%d.data /path/to/dir/file -> /path/to/dir/file.%Y%m%d.log

%Y%m%d -> %Y%m%d_** (non append)

+ .gz (gzipped) TODO: remove time_slice_format when end of support of compat_parameters



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/fluent/plugin/out_file_with_header.rb', line 281

def generate_path_template(original, timekey, append, compress, path_suffix: '', time_slice_format: nil)
  comp_suffix = compression_suffix(compress)
  index_placeholder = append ? '' : '_**'
  if original.index('*')
    raise "BUG: configuration error must be raised for path including '*' without timekey" unless timekey
    time_placeholders_part = time_slice_format || timekey_to_timeformat(timekey)
    original.gsub('*', time_placeholders_part + index_placeholder) + comp_suffix
  else
    if timekey
      if time_slice_format
        "#{original}.#{time_slice_format}#{index_placeholder}#{path_suffix}#{comp_suffix}"
      else
        time_placeholders = timekey_to_timeformat(timekey)
        if time_placeholders.scan(/../).any?{|ph| original.include?(ph) }
          raise Fluent::ConfigError, "insufficient timestamp placeholders in path" if time_placeholders.scan(/../).any?{|ph| !original.include?(ph) }
          "#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}"
        else
          "#{original}.#{time_placeholders}#{index_placeholder}#{path_suffix}#{comp_suffix}"
        end
      end
    else
      "#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}"
    end
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


186
187
188
# File 'lib/fluent/plugin/out_file_with_header.rb', line 186

def multi_workers_ready?
  true
end

#timekey_to_timeformat(timekey) ⇒ Object



256
257
258
259
260
261
262
263
264
# File 'lib/fluent/plugin/out_file_with_header.rb', line 256

def timekey_to_timeformat(timekey)
  case timekey
  when nil          then ''
  when 0...60       then '%Y%m%d%H%M%S' # 60 exclusive
  when 60...3600    then '%Y%m%d%H%M'
  when 3600...86400 then '%Y%m%d%H'
  else                   '%Y%m%d'
  end
end

#write(chunk) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/fluent/plugin/out_file_with_header.rb', line 195

def write(chunk)
  path = extract_placeholders(@path_template, chunk)
  FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

  writer = case
           when @compress_method.nil?
             method(:write_without_compression)
           when @compress_method == :gzip
             if @buffer.compress != :gzip || @recompress
               method(:write_gzip_with_compression)
             else
               method(:write_gzip_from_gzipped_chunk)
             end
           else
             raise "BUG: unknown compression method #{@compress_method}"
           end

  if @append
    writer.call(path, chunk)
  else
    find_filepath_available(path, with_lock: @need_lock) do |actual_path|
      writer.call(actual_path, chunk)
      path = actual_path
    end
  end

  @last_written_path = path
end

#write_gzip_from_gzipped_chunk(path, chunk) ⇒ Object



246
247
248
249
250
251
252
253
254
# File 'lib/fluent/plugin/out_file_with_header.rb', line 246

def write_gzip_from_gzipped_chunk(path, chunk)
  if f.stat.size == 0
    f.write(headers)
    f.write("\n")
  end
  File.open(path, "ab", @file_perm) do |f|
    chunk.write_to(f, compressed: :gzip)
  end
end

#write_gzip_with_compression(path, chunk) ⇒ Object



234
235
236
237
238
239
240
241
242
243
244
# File 'lib/fluent/plugin/out_file_with_header.rb', line 234

def write_gzip_with_compression(path, chunk)
  File.open(path, "ab", @file_perm) do |f|
    gz = Zlib::GzipWriter.new(f)
    if f.stat.size == 0
      f.write(headers)
      f.write("\n")
    end
    chunk.write_to(gz, compressed: :text)
    gz.close
  end
end

#write_without_compression(path, chunk) ⇒ Object



224
225
226
227
228
229
230
231
232
# File 'lib/fluent/plugin/out_file_with_header.rb', line 224

def write_without_compression(path, chunk)
  File.open(path, "ab", @file_perm) do |f|
    if f.stat.size == 0
      f.write(headers)
      f.write("\n")
    end
    chunk.write_to(f)
  end
end