Class: Fluent::Plugin::Buffer::FileSingleChunk

Inherits:
Chunk
  • Object
show all
Defined in:
lib/fluent/plugin/buffer/file_single_chunk.rb

Defined Under Namespace

Classes: FileChunkError

Constant Summary collapse

PATH_EXT =

buffer path user specified : /path/to/directory buffer chunk path : /path/to/directory/fsb.key.b513b61c9791029c2513b61c9791029c2.buf state: b/q - ‘b’(on stage), ‘q’(enqueued)

'buf'
PATH_SUFFIX =
".#{PATH_EXT}"
PATH_REGEXP =

//n switch means explicit ‘ASCII-8BIT’ pattern

/\.(b|q)([0-9a-f]+)\.#{PATH_EXT}*\Z/n
ESCAPE_REGEXP =
/[^-_.a-zA-Z0-9]/n

Instance Attribute Summary collapse

Attributes inherited from Chunk

#metadata, #state, #unique_id

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Chunk

#append, #closed?, #created_at, #modified_at, #queued?, #raw_create_at, #raw_modified_at, #staged!, #staged?, #unstaged!, #unstaged?, #writable?, #write_to

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Constructor Details

#initialize(metadata, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text) ⇒ FileSingleChunk

Returns a new instance of FileSingleChunk.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 38

def initialize(, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text)
  super(, compress: compress)
  @key = key
  perm ||= Fluent::DEFAULT_FILE_PERMISSION
  @permission = perm.is_a?(String) ? perm.to_i(8) : perm
  @bytesize = @size = @adding_bytes = @adding_size = 0

  case mode
  when :create then create_new_chunk(path, , @permission)
  when :staged then load_existing_staged_chunk(path)
  when :queued then load_existing_enqueued_chunk(path)
  else
    raise ArgumentError, "Invalid file chunk mode: #{mode}"
  end
end

Instance Attribute Details

#pathObject (readonly)

Returns the value of attribute path.



36
37
38
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 36

def path
  @path
end

#permissionObject (readonly)

Returns the value of attribute permission.



36
37
38
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 36

def permission
  @permission
end

Class Method Details

.assume_chunk_state(path) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 149

def self.assume_chunk_state(path)
  return :unknown unless path.end_with?(PATH_SUFFIX)

  if PATH_REGEXP =~ path
    $1 == 'b' ? :staged : :queued
  else
    # files which matches to glob of buffer file pattern
    # it includes files which are created by out_file
    :unknown
  end
end

.generate_queued_chunk_path(path, unique_id) ⇒ Object



183
184
185
186
187
188
189
190
191
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 183

def self.generate_queued_chunk_path(path, unique_id)
  chunk_id = Fluent::UniqueId.hex(unique_id)
  staged_path = ".b#{chunk_id}."
  if path.index(staged_path)
    path.sub(staged_path, ".q#{chunk_id}.")
  else # for unexpected cases (ex: users rename files while opened by fluentd)
    path + ".q#{chunk_id}.chunk"
  end
end

.generate_stage_chunk_path(path, key, unique_id) ⇒ Object



172
173
174
175
176
177
178
179
180
181
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 172

def self.generate_stage_chunk_path(path, key, unique_id)
  pos = path.index('.*.')
  raise "BUG: buffer chunk path on stage MUST have '.*.'" unless pos

  prefix = path[0...pos]
  suffix = path[(pos + 3)..-1]

  chunk_id = Fluent::UniqueId.hex(unique_id)
  "#{prefix}.#{key}.b#{chunk_id}.#{suffix}"
end

.unique_id_and_key_from_path(path) ⇒ Object



161
162
163
164
165
166
167
168
169
170
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 161

def self.unique_id_and_key_from_path(path)
  base = File.basename(path)
  res = PATH_REGEXP =~ base
  return nil unless res

  key = base[4..res - 1] # remove 'fsb.' and '.'
  hex_id = $2            # remove '.' and '.buf'
  unique_id = hex_id.scan(/../).map {|x| x.to_i(16) }.pack('C*')
  [unique_id, key]
end

Instance Method Details

#bytesizeObject



83
84
85
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 83

def bytesize
  @bytesize + @adding_bytes
end

#closeObject



121
122
123
124
125
126
127
128
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 121

def close
  super
  size = @chunk.size
  @chunk.close
  if size == 0
    File.unlink(@path)
  end
end

#commitObject



64
65
66
67
68
69
70
71
72
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 64

def commit
  @commit_position = @chunk.pos
  @size += @adding_size
  @bytesize += @adding_bytes
  @adding_bytes = @adding_size = 0
  @modified_at = Fluent::Clock.real_now

  true
end

#concat(bulk, bulk_size) ⇒ Object



54
55
56
57
58
59
60
61
62
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 54

def concat(bulk, bulk_size)
  raise "BUG: concatenating to unwritable chunk, now '#{self.state}'" unless self.writable?

  bulk.force_encoding(Encoding::ASCII_8BIT)
  @chunk.write(bulk)
  @adding_bytes += bulk.bytesize
  @adding_size += bulk_size
  true
end

#create_new_chunk(path, metadata, perm) ⇒ Object



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 253

def create_new_chunk(path, , perm)
  @path = self.class.generate_stage_chunk_path(path, encode_key(), @unique_id)
  begin
    @chunk = File.open(@path, 'wb+', perm)
    @chunk.set_encoding(Encoding::ASCII_8BIT)
    @chunk.sync = true
    @chunk.binmode
  rescue => e
    # Here assumes "Too many open files" like recoverable error so raising BufferOverflowError.
    # If other cases are possible, we will change erorr handling with proper classes.
    raise BufferOverflowError, "can't create buffer file for #{path}. Stop creating buffer files: error = #{e}"
  end

  @state = :unstaged
  @bytesize = 0
  @commit_position = @chunk.pos # must be 0
  @adding_bytes = 0
  @adding_size = 0
end

#decode_key(key) ⇒ Object



249
250
251
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 249

def decode_key(key)
  URI::DEFAULT_PARSER.unescape(key)
end

#empty?Boolean

Returns:

  • (Boolean)


91
92
93
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 91

def empty?
  @bytesize.zero?
end

#encode_key(metadata) ⇒ Object



243
244
245
246
247
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 243

def encode_key()
  k = @key ? .variables[@key] : .tag
  k ||= ''
  URI::DEFAULT_PARSER.escape(k, ESCAPE_REGEXP)
end

#enqueued!Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 95

def enqueued!
  return unless self.staged?

  new_chunk_path = self.class.generate_queued_chunk_path(@path, @unique_id)

  begin
    file_rename(@chunk, @path, new_chunk_path, ->(new_io) { @chunk = new_io })
  rescue => e
    begin
      file_rename(@chunk, new_chunk_path, @path, ->(new_io) { @chunk = new_io }) if File.exist?(new_chunk_path)
    rescue => re
      # In this point, restore buffer state is hard because previous `file_rename` failed by resource problem.
      # Retry is one possible approach but it may cause livelock under limited resources or high load environment.
      # So we ignore such errors for now and log better message instead.
      # "Too many open files" should be fixed by proper buffer configuration and system setting.
      raise "can't enqueue buffer file and failed to restore. This may causes inconsistent state: path = #{@path}, error = '#{e}', retry error = '#{re}'"
    else
      raise "can't enqueue buffer file: path = #{@path}, error = '#{e}'"
    end
  end

  @path = new_chunk_path

  super
end

#file_rename(file, old_path, new_path, callback = nil) ⇒ Object



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 224

def file_rename(file, old_path, new_path, callback = nil)
  pos = file.pos
  if Fluent.windows?
    file.close
    File.rename(old_path, new_path)
    file = File.open(new_path, 'rb', @permission)
  else
    File.rename(old_path, new_path)
    file.reopen(new_path, 'rb')
  end
  file.set_encoding(Encoding::ASCII_8BIT)
  file.sync = true
  file.binmode
  file.pos = pos
  callback.call(file) if callback
end

#load_existing_enqueued_chunk(path) ⇒ Object

Raises:



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 292

def load_existing_enqueued_chunk(path)
  @path = path
  raise FileChunkError, "enqueued file chunk is empty" if File.size(@path).zero?

  @chunk = File.open(@path, 'rb')
  @chunk.set_encoding(Encoding::ASCII_8BIT)
  @chunk.binmode
  @chunk.seek(0, IO::SEEK_SET)

  

  @state = :queued
  @bytesize = @chunk.size
  @commit_position = @chunk.size
end

#load_existing_staged_chunk(path) ⇒ Object

Raises:



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 273

def load_existing_staged_chunk(path)
  @path = path
  raise FileChunkError, "staged file chunk is empty" if File.size(@path).zero?

  @chunk = File.open(@path, 'rb+')
  @chunk.set_encoding(Encoding::ASCII_8BIT)
  @chunk.sync = true
  @chunk.binmode
  @chunk.seek(0, IO::SEEK_END)

  

  @state = :staged
  @bytesize = @chunk.size
  @commit_position = @chunk.pos
  @adding_bytes = 0
  @adding_size = 0
end

#open(**kwargs, &block) ⇒ Object



142
143
144
145
146
147
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 142

def open(**kwargs, &block)
  @chunk.seek(0, IO::SEEK_SET)
  val = yield @chunk
  @chunk.seek(0, IO::SEEK_END) if self.staged?
  val
end

#purgeObject



130
131
132
133
134
135
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 130

def purge
  super
  @chunk.close
  @bytesize = @size = @adding_bytes = @adding_size = 0
  File.unlink(@path)
end

#read(**kwargs) ⇒ Object



137
138
139
140
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 137

def read(**kwargs)
  @chunk.seek(0, IO::SEEK_SET)
  @chunk.read
end

#restore_metadataObject



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 193

def 
  if res = self.class.unique_id_and_key_from_path(@path)
    @unique_id = res.first
    key = decode_key(res.last)
    if @key
      @metadata.variables = {@key => key}
    else
      @metadata.tag = key
    end
  else
    raise FileChunkError, "Invalid chunk found. unique_id and key not exist: #{@path}"
  end
  @size = 0

  stat = File.stat(@path)
  @created_at = stat.ctime.to_i
  @modified_at = stat.mtime.to_i
end

#restore_size(chunk_format) ⇒ Object



212
213
214
215
216
217
218
219
220
221
222
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 212

def restore_size(chunk_format)
  count = 0
  File.open(@path, 'rb') { |f|
    if chunk_format == :msgpack
      Fluent::MessagePackFactory.msgpack_unpacker(f).each { |d| count += 1 }
    else
      f.each_line { |l| count += 1 }
    end
  }
  @size = count
end

#rollbackObject



74
75
76
77
78
79
80
81
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 74

def rollback
  if @chunk.pos != @commit_position
    @chunk.seek(@commit_position, IO::SEEK_SET)
    @chunk.truncate(@commit_position)
  end
  @adding_bytes = @adding_size = 0
  true
end

#sizeObject



87
88
89
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 87

def size
  @size + @adding_size
end