Class: Fluent::Plugin::Buffer::FileSingleChunk
- Inherits:
-
Chunk
- Object
- Chunk
- Fluent::Plugin::Buffer::FileSingleChunk
show all
- Defined in:
- lib/fluent/plugin/buffer/file_single_chunk.rb
Defined Under Namespace
Classes: FileChunkError
Constant Summary
collapse
- PATH_EXT =
buffer path user specified : /path/to/directory buffer chunk path : /path/to/directory/fsb.key.b513b61c9791029c2513b61c9791029c2.buf state: b/q - ‘b’(on stage), ‘q’(enqueued)
'buf'
- PATH_SUFFIX =
".#{PATH_EXT}"
- PATH_REGEXP =
//n switch means explicit ‘ASCII-8BIT’ pattern
/\.(b|q)([0-9a-f]+)\.#{PATH_EXT}*\Z/n
- ESCAPE_REGEXP =
/[^-_.a-zA-Z0-9]/n
Instance Attribute Summary collapse
Attributes inherited from Chunk
#metadata, #state, #unique_id
Class Method Summary
collapse
Instance Method Summary
collapse
-
#bytesize ⇒ Object
-
#close ⇒ Object
-
#commit ⇒ Object
-
#concat(bulk, bulk_size) ⇒ Object
-
#create_new_chunk(path, metadata, perm) ⇒ Object
-
#decode_key(key) ⇒ Object
-
#empty? ⇒ Boolean
-
#encode_key(metadata) ⇒ Object
-
#enqueued! ⇒ Object
-
#file_rename(file, old_path, new_path, callback = nil) ⇒ Object
-
#initialize(metadata, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text) ⇒ FileSingleChunk
constructor
A new instance of FileSingleChunk.
-
#load_existing_enqueued_chunk(path) ⇒ Object
-
#load_existing_staged_chunk(path) ⇒ Object
-
#open(**kwargs, &block) ⇒ Object
-
#purge ⇒ Object
-
#read(**kwargs) ⇒ Object
-
#restore_metadata ⇒ Object
-
#restore_size(chunk_format) ⇒ Object
-
#rollback ⇒ Object
-
#size ⇒ Object
Methods inherited from Chunk
#append, #closed?, #created_at, #modified_at, #queued?, #raw_create_at, #raw_modified_at, #staged!, #staged?, #unstaged!, #unstaged?, #writable?, #write_to
#dump_unique_id_hex, #generate_unique_id
Constructor Details
#initialize(metadata, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text) ⇒ FileSingleChunk
Returns a new instance of FileSingleChunk.
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 38
def initialize(metadata, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text)
super(metadata, compress: compress)
@key = key
perm ||= Fluent::DEFAULT_FILE_PERMISSION
@permission = perm.is_a?(String) ? perm.to_i(8) : perm
@bytesize = @size = @adding_bytes = @adding_size = 0
case mode
when :create then create_new_chunk(path, metadata, @permission)
when :staged then load_existing_staged_chunk(path)
when :queued then load_existing_enqueued_chunk(path)
else
raise ArgumentError, "Invalid file chunk mode: #{mode}"
end
end
|
Instance Attribute Details
#path ⇒ Object
Returns the value of attribute path.
36
37
38
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 36
def path
@path
end
|
#permission ⇒ Object
Returns the value of attribute permission.
36
37
38
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 36
def permission
@permission
end
|
Class Method Details
.assume_chunk_state(path) ⇒ Object
149
150
151
152
153
154
155
156
157
158
159
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 149
def self.assume_chunk_state(path)
return :unknown unless path.end_with?(PATH_SUFFIX)
if PATH_REGEXP =~ path
$1 == 'b' ? :staged : :queued
else
:unknown
end
end
|
.generate_queued_chunk_path(path, unique_id) ⇒ Object
183
184
185
186
187
188
189
190
191
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 183
def self.generate_queued_chunk_path(path, unique_id)
chunk_id = Fluent::UniqueId.hex(unique_id)
staged_path = ".b#{chunk_id}."
if path.index(staged_path)
path.sub(staged_path, ".q#{chunk_id}.")
else path + ".q#{chunk_id}.chunk"
end
end
|
.generate_stage_chunk_path(path, key, unique_id) ⇒ Object
172
173
174
175
176
177
178
179
180
181
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 172
def self.generate_stage_chunk_path(path, key, unique_id)
pos = path.index('.*.')
raise "BUG: buffer chunk path on stage MUST have '.*.'" unless pos
prefix = path[0...pos]
suffix = path[(pos + 3)..-1]
chunk_id = Fluent::UniqueId.hex(unique_id)
"#{prefix}.#{key}.b#{chunk_id}.#{suffix}"
end
|
.unique_id_and_key_from_path(path) ⇒ Object
161
162
163
164
165
166
167
168
169
170
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 161
def self.unique_id_and_key_from_path(path)
base = File.basename(path)
res = PATH_REGEXP =~ base
return nil unless res
key = base[4..res - 1] hex_id = $2 unique_id = hex_id.scan(/../).map {|x| x.to_i(16) }.pack('C*')
[unique_id, key]
end
|
Instance Method Details
#bytesize ⇒ Object
83
84
85
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 83
def bytesize
@bytesize + @adding_bytes
end
|
#close ⇒ Object
121
122
123
124
125
126
127
128
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 121
def close
super
size = @chunk.size
@chunk.close
if size == 0
File.unlink(@path)
end
end
|
#commit ⇒ Object
64
65
66
67
68
69
70
71
72
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 64
def commit
@commit_position = @chunk.pos
@size += @adding_size
@bytesize += @adding_bytes
@adding_bytes = @adding_size = 0
@modified_at = Fluent::Clock.real_now
true
end
|
#concat(bulk, bulk_size) ⇒ Object
54
55
56
57
58
59
60
61
62
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 54
def concat(bulk, bulk_size)
raise "BUG: concatenating to unwritable chunk, now '#{self.state}'" unless self.writable?
bulk.force_encoding(Encoding::ASCII_8BIT)
@chunk.write(bulk)
@adding_bytes += bulk.bytesize
@adding_size += bulk_size
true
end
|
#create_new_chunk(path, metadata, perm) ⇒ Object
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 253
def create_new_chunk(path, metadata, perm)
@path = self.class.generate_stage_chunk_path(path, encode_key(metadata), @unique_id)
begin
@chunk = File.open(@path, 'wb+', perm)
@chunk.set_encoding(Encoding::ASCII_8BIT)
@chunk.sync = true
@chunk.binmode
rescue => e
raise BufferOverflowError, "can't create buffer file for #{path}. Stop creating buffer files: error = #{e}"
end
@state = :unstaged
@bytesize = 0
@commit_position = @chunk.pos @adding_bytes = 0
@adding_size = 0
end
|
#decode_key(key) ⇒ Object
249
250
251
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 249
def decode_key(key)
URI::DEFAULT_PARSER.unescape(key)
end
|
#empty? ⇒ Boolean
91
92
93
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 91
def empty?
@bytesize.zero?
end
|
#encode_key(metadata) ⇒ Object
243
244
245
246
247
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 243
def encode_key(metadata)
k = @key ? metadata.variables[@key] : metadata.tag
k ||= ''
URI::DEFAULT_PARSER.escape(k, ESCAPE_REGEXP)
end
|
#enqueued! ⇒ Object
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 95
def enqueued!
return unless self.staged?
new_chunk_path = self.class.generate_queued_chunk_path(@path, @unique_id)
begin
file_rename(@chunk, @path, new_chunk_path, ->(new_io) { @chunk = new_io })
rescue => e
begin
file_rename(@chunk, new_chunk_path, @path, ->(new_io) { @chunk = new_io }) if File.exist?(new_chunk_path)
rescue => re
raise "can't enqueue buffer file and failed to restore. This may causes inconsistent state: path = #{@path}, error = '#{e}', retry error = '#{re}'"
else
raise "can't enqueue buffer file: path = #{@path}, error = '#{e}'"
end
end
@path = new_chunk_path
super
end
|
#file_rename(file, old_path, new_path, callback = nil) ⇒ Object
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 224
def file_rename(file, old_path, new_path, callback = nil)
pos = file.pos
if Fluent.windows?
file.close
File.rename(old_path, new_path)
file = File.open(new_path, 'rb', @permission)
else
File.rename(old_path, new_path)
file.reopen(new_path, 'rb')
end
file.set_encoding(Encoding::ASCII_8BIT)
file.sync = true
file.binmode
file.pos = pos
callback.call(file) if callback
end
|
#load_existing_enqueued_chunk(path) ⇒ Object
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 292
def load_existing_enqueued_chunk(path)
@path = path
raise FileChunkError, "enqueued file chunk is empty" if File.size(@path).zero?
@chunk = File.open(@path, 'rb')
@chunk.set_encoding(Encoding::ASCII_8BIT)
@chunk.binmode
@chunk.seek(0, IO::SEEK_SET)
restore_metadata
@state = :queued
@bytesize = @chunk.size
@commit_position = @chunk.size
end
|
#load_existing_staged_chunk(path) ⇒ Object
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 273
def load_existing_staged_chunk(path)
@path = path
raise FileChunkError, "staged file chunk is empty" if File.size(@path).zero?
@chunk = File.open(@path, 'rb+')
@chunk.set_encoding(Encoding::ASCII_8BIT)
@chunk.sync = true
@chunk.binmode
@chunk.seek(0, IO::SEEK_END)
restore_metadata
@state = :staged
@bytesize = @chunk.size
@commit_position = @chunk.pos
@adding_bytes = 0
@adding_size = 0
end
|
#open(**kwargs, &block) ⇒ Object
142
143
144
145
146
147
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 142
def open(**kwargs, &block)
@chunk.seek(0, IO::SEEK_SET)
val = yield @chunk
@chunk.seek(0, IO::SEEK_END) if self.staged?
val
end
|
#purge ⇒ Object
130
131
132
133
134
135
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 130
def purge
super
@chunk.close
@bytesize = @size = @adding_bytes = @adding_size = 0
File.unlink(@path)
end
|
#read(**kwargs) ⇒ Object
137
138
139
140
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 137
def read(**kwargs)
@chunk.seek(0, IO::SEEK_SET)
@chunk.read
end
|
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 193
def restore_metadata
if res = self.class.unique_id_and_key_from_path(@path)
@unique_id = res.first
key = decode_key(res.last)
if @key
@metadata.variables = {@key => key}
else
@metadata.tag = key
end
else
raise FileChunkError, "Invalid chunk found. unique_id and key not exist: #{@path}"
end
@size = 0
stat = File.stat(@path)
@created_at = stat.ctime.to_i
@modified_at = stat.mtime.to_i
end
|
#restore_size(chunk_format) ⇒ Object
212
213
214
215
216
217
218
219
220
221
222
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 212
def restore_size(chunk_format)
count = 0
File.open(@path, 'rb') { |f|
if chunk_format == :msgpack
Fluent::MessagePackFactory.msgpack_unpacker(f).each { |d| count += 1 }
else
f.each_line { |l| count += 1 }
end
}
@size = count
end
|
#rollback ⇒ Object
74
75
76
77
78
79
80
81
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 74
def rollback
if @chunk.pos != @commit_position
@chunk.seek(@commit_position, IO::SEEK_SET)
@chunk.truncate(@commit_position)
end
@adding_bytes = @adding_size = 0
true
end
|
#size ⇒ Object
87
88
89
|
# File 'lib/fluent/plugin/buffer/file_single_chunk.rb', line 87
def size
@size + @adding_size
end
|