Class: Fluent::Plugin::FileOutput

Inherits:
Output show all
Defined in:
lib/fluent/plugin/out_file.rb

Defined Under Namespace

Modules: SymlinkBufferMixin

Constant Summary collapse

SUPPORTED_COMPRESS =
[:text, :gz, :gzip]
SUPPORTED_COMPRESS_MAP =
{
  text: nil,
  gz: :gzip,
  gzip: :gzip,
}
DEFAULT_TIMEKEY =
60 * 60 * 24

Constants inherited from Output

Output::BUFFER_STATS_KEYS, Output::CHUNKING_FIELD_WARN_NUM, Output::CHUNK_ID_PLACEHOLDER_PATTERN, Output::CHUNK_KEY_PATTERN, Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Output::FORMAT_MSGPACK_STREAM, Output::FORMAT_MSGPACK_STREAM_TIME_INT, Output::TIMESTAMP_CHECK_BASE_TIME, Output::TIME_KEY_PLACEHOLDER_THRESHOLDS, Output::UNRECOVERABLE_ERRORS

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Output

#as_secondary, #buffer, #chunk_key_accessors, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #emit_count, #emit_records, #num_errors, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #rollback_count, #secondary, #timekey_zone, #write_count

Attributes included from Fluent::PluginLoggerMixin

#log

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods inherited from Output

#acts_as_secondary, #after_shutdown, #after_start, #backup_chunk, #before_shutdown, #calculate_timekey, #check_slow_flush, #chunk_for_test, #close, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #extract_placeholders, #flush_thread_run, #flush_thread_wakeup, #force_flush, #formatted_to_msgpack_binary, #formatted_to_msgpack_binary?, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #initialize, #interrupt_flushes, #metadata, #next_flush_time, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_write, #shutdown, #start, #statistics, #stop, #submit_flush_all, #submit_flush_once, #support_in_v12_style?, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #update_retry_state, #write_guard

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Methods included from Fluent::PluginHelper::Mixin

included

Methods included from Fluent::PluginLoggerMixin

included, #initialize, #terminate

Methods included from Fluent::PluginId

#initialize, #plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop

Methods inherited from Base

#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #initialize, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #start, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, #initialize, lookup_type, register_type

Constructor Details

This class inherits a constructor from Fluent::Plugin::Output

Instance Attribute Details

#dir_permObject (readonly)

Returns the value of attribute dir_perm.



67
68
69
# File 'lib/fluent/plugin/out_file.rb', line 67

def dir_perm
  @dir_perm
end

#last_written_pathObject

for tests



68
69
70
# File 'lib/fluent/plugin/out_file.rb', line 68

def last_written_path
  @last_written_path
end

Instance Method Details

#compression_suffix(compress) ⇒ Object



254
255
256
257
258
259
260
261
# File 'lib/fluent/plugin/out_file.rb', line 254

def compression_suffix(compress)
  case compress
  when :gzip then '.gz'
  when nil then ''
  else
    raise ArgumentError, "unknown compression type #{compress}"
  end
end

#configure(conf) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/fluent/plugin/out_file.rb', line 105

def configure(conf)
  compat_parameters_convert(conf, :formatter, :buffer, :inject, default_chunk_key: "time")

  configured_time_slice_format = conf['time_slice_format']

  if conf.elements(name: 'buffer').empty?
    conf.add_element('buffer', 'time')
  end
  buffer_conf = conf.elements(name: 'buffer').first
  # Fluent::PluginId#configure is not called yet, so we can't use #plugin_root_dir here.
  if !buffer_conf.has_key?('path') && !(conf['@id'] && system_config.root_dir)
    # v0.14 file buffer handles path as directory if '*' is missing
    # 'dummy_path' is not to raise configuration error for 'path' in file buffer plugin,
    # but raise it in this plugin.
    buffer_conf['path'] = conf['path'] || '/tmp/dummy_path'
  end

  if conf.has_key?('utc') || conf.has_key?('localtime')
    param_name = conf.has_key?('utc') ? 'utc' : 'localtime'
    log.warn "'#{param_name}' is deprecated for output plugin. This parameter is used for formatter plugin in compatibility layer. If you want to use same feature, use timekey_use_utc parameter in <buffer> directive instead"
  end

  super

  @compress_method = SUPPORTED_COMPRESS_MAP[@compress]

  if @path.include?('*') && !@buffer_config.timekey
    raise Fluent::ConfigError, "path including '*' must be used with buffer chunk key 'time'"
  end

  path_suffix = @add_path_suffix ? @path_suffix : ''
  path_timekey = if @chunk_key_time
                   @as_secondary ? @primary_instance.buffer_config.timekey : @buffer_config.timekey
                 else
                   nil
                 end
  @path_template = generate_path_template(@path, path_timekey, @append, @compress_method, path_suffix: path_suffix, time_slice_format: configured_time_slice_format)

  if @as_secondary
    # When this plugin is configured as secondary & primary plugin has tag key, but this plugin may not have it.
    # Increment placeholder can make another output file per chunk tag/keys even if original path doesn't include it.
    placeholder_validators(:path, @path_template).select{|v| v.type == :time }.each do |v|
      v.validate!
    end
  else
    placeholder_validate!(:path, @path_template)

    max_tag_index = get_placeholders_tag(@path_template).max || 1
    max_tag_index = 1 if max_tag_index < 1
    dummy_tag = (['a'] * max_tag_index).join('.')
    dummy_record_keys = get_placeholders_keys(@path_template) || ['message']
    dummy_record = Hash[dummy_record_keys.zip(['data'] * dummy_record_keys.size)]

    test_chunk1 = chunk_for_test(dummy_tag, Fluent::EventTime.now, dummy_record)
    test_path = extract_placeholders(@path_template, test_chunk1)
    unless ::Fluent::FileUtil.writable_p?(test_path)
      raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable"
    end
  end

  @formatter = formatter_create

  if @symlink_path && @buffer.respond_to?(:path)
    if @as_secondary
      raise Fluent::ConfigError, "symlink_path option is unavailable in <secondary>: consider to use secondary_file plugin"
    end
    if Fluent.windows?
      log.warn "symlink_path is unavailable on Windows platform. disabled."
      @symlink_path = nil
    else
      @buffer.extend SymlinkBufferMixin
      @buffer.symlink_path = @symlink_path
      @buffer.output_plugin_for_symlink = self
    end
  end

  @dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION
  @file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION
  @need_lock = system_config.workers > 1
end

#find_filepath_available(path_with_placeholder, with_lock: false) ⇒ Object

for non-append



295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/fluent/plugin/out_file.rb', line 295

def find_filepath_available(path_with_placeholder, with_lock: false) # for non-append
  raise "BUG: index placeholder not found in path: #{path_with_placeholder}" unless path_with_placeholder.index('_**')
  i = 0
  dir_path = locked = nil
  while true
    path = path_with_placeholder.sub('_**', "_#{i}")
    i += 1
    next if File.exist?(path)

    if with_lock
      dir_path = path + '.lock'
      locked = Dir.mkdir(dir_path) rescue false
      next unless locked
      # ensure that other worker doesn't create a file (and release lock)
      # between previous File.exist? and Dir.mkdir
      next if File.exist?(path)
    end

    break
  end
  yield path
ensure
  if dir_path && locked && Dir.exist?(dir_path)
    Dir.rmdir(dir_path) rescue nil
  end
end

#format(tag, time, record) ⇒ Object



190
191
192
193
# File 'lib/fluent/plugin/out_file.rb', line 190

def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end

#generate_path_template(original, timekey, append, compress, path_suffix: '', time_slice_format: nil) ⇒ Object

/path/to/dir/file.* -> /path/to/dir/file.%Y%m%d /path/to/dir/file.*.data -> /path/to/dir/file.%Y%m%d.data /path/to/dir/file -> /path/to/dir/file.%Y%m%d.log

%Y%m%d -> %Y%m%d_** (non append)

+ .gz (gzipped) TODO: remove time_slice_format when end of support of compat_parameters



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/fluent/plugin/out_file.rb', line 269

def generate_path_template(original, timekey, append, compress, path_suffix: '', time_slice_format: nil)
  comp_suffix = compression_suffix(compress)
  index_placeholder = append ? '' : '_**'
  if original.index('*')
    raise "BUG: configuration error must be raised for path including '*' without timekey" unless timekey
    time_placeholders_part = time_slice_format || timekey_to_timeformat(timekey)
    original.gsub('*', time_placeholders_part + index_placeholder) + comp_suffix
  else
    if timekey
      if time_slice_format
        "#{original}.#{time_slice_format}#{index_placeholder}#{path_suffix}#{comp_suffix}"
      else
        time_placeholders = timekey_to_timeformat(timekey)
        if time_placeholders.scan(/../).any?{|ph| original.include?(ph) }
          raise Fluent::ConfigError, "insufficient timestamp placeholders in path" if time_placeholders.scan(/../).any?{|ph| !original.include?(ph) }
          "#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}"
        else
          "#{original}.#{time_placeholders}#{index_placeholder}#{path_suffix}#{comp_suffix}"
        end
      end
    else
      "#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}"
    end
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


186
187
188
# File 'lib/fluent/plugin/out_file.rb', line 186

def multi_workers_ready?
  true
end

#timekey_to_timeformat(timekey) ⇒ Object



244
245
246
247
248
249
250
251
252
# File 'lib/fluent/plugin/out_file.rb', line 244

def timekey_to_timeformat(timekey)
  case timekey
  when nil          then ''
  when 0...60       then '%Y%m%d%H%M%S' # 60 exclusive
  when 60...3600    then '%Y%m%d%H%M'
  when 3600...86400 then '%Y%m%d%H'
  else                   '%Y%m%d'
  end
end

#write(chunk) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/fluent/plugin/out_file.rb', line 195

def write(chunk)
  path = extract_placeholders(@path_template, chunk)
  FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

  writer = case
           when @compress_method.nil?
             method(:write_without_compression)
           when @compress_method == :gzip
             if @buffer.compress != :gzip || @recompress
               method(:write_gzip_with_compression)
             else
               method(:write_gzip_from_gzipped_chunk)
             end
           else
             raise "BUG: unknown compression method #{@compress_method}"
           end

  if @append
    writer.call(path, chunk)
  else
    find_filepath_available(path, with_lock: @need_lock) do |actual_path|
      writer.call(actual_path, chunk)
      path = actual_path
    end
  end

  @last_written_path = path
end

#write_gzip_from_gzipped_chunk(path, chunk) ⇒ Object



238
239
240
241
242
# File 'lib/fluent/plugin/out_file.rb', line 238

def write_gzip_from_gzipped_chunk(path, chunk)
  File.open(path, "ab", @file_perm) do |f|
    chunk.write_to(f, compressed: :gzip)
  end
end

#write_gzip_with_compression(path, chunk) ⇒ Object



230
231
232
233
234
235
236
# File 'lib/fluent/plugin/out_file.rb', line 230

def write_gzip_with_compression(path, chunk)
  File.open(path, "ab", @file_perm) do |f|
    gz = Zlib::GzipWriter.new(f)
    chunk.write_to(gz, compressed: :text)
    gz.close
  end
end

#write_without_compression(path, chunk) ⇒ Object



224
225
226
227
228
# File 'lib/fluent/plugin/out_file.rb', line 224

def write_without_compression(path, chunk)
  File.open(path, "ab", @file_perm) do |f|
    chunk.write_to(f)
  end
end