Class: Fluent::FileBuffer
Constant Summary
collapse
- PATH_MATCH =
/^(.*)[\._](b|q)([0-9a-fA-F]{1,32})$/
Instance Attribute Summary
Attributes included from Configurable
#config
Instance Method Summary
collapse
Methods inherited from BasicBuffer
#clear!, #emit, #enable_parallel, #keys, #pop, #push, #queue_size, #shutdown, #total_queued_chunk_size, #write_chunk
Methods inherited from Buffer
#shutdown
included
Constructor Details
Returns a new instance of FileBuffer.
81
82
83
84
|
# File 'lib/fluent/plugin/buf_file.rb', line 81
def initialize
require 'uri'
super
end
|
Instance Method Details
#before_shutdown(out) ⇒ Object
171
172
173
174
175
176
177
178
179
180
181
|
# File 'lib/fluent/plugin/buf_file.rb', line 171
def before_shutdown(out)
if @flush_at_shutdown
synchronize do
@map.each_key {|key|
push(key)
}
while pop(out)
end
end
end
end
|
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/fluent/plugin/buf_file.rb', line 88
def configure(conf)
super
if pos = @buffer_path.index('*')
@buffer_path_prefix = @buffer_path[0,pos]
@buffer_path_suffix = @buffer_path[pos+1..-1]
else
@buffer_path_prefix = @buffer_path+"."
@buffer_path_suffix = ".log"
end
if flush_at_shutdown = conf['flush_at_shutdown']
@flush_at_shutdown = true
else
@flush_at_shutdown = false
end
end
|
#enqueue(chunk) ⇒ Object
159
160
161
162
163
164
165
166
167
168
169
|
# File 'lib/fluent/plugin/buf_file.rb', line 159
def enqueue(chunk)
path = chunk.path
mp = path[@buffer_path_prefix.length..-(@buffer_path_suffix.length+1)]
m = PATH_MATCH.match(mp)
encoded_key = m ? m[1] : ""
tsuffix = m[3]
npath = "#{@buffer_path_prefix}#{encoded_key}.q#{tsuffix}#{@buffer_path_suffix}"
chunk.mv(npath)
end
|
#new_chunk(key) ⇒ Object
113
114
115
116
117
118
|
# File 'lib/fluent/plugin/buf_file.rb', line 113
def new_chunk(key)
encoded_key = encode_key(key)
path, tsuffix = make_path(encoded_key, "b")
unique_id = tsuffix_to_unique_id(tsuffix)
FileBufferChunk.new(key, path, unique_id)
end
|
#resume ⇒ Object
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
# File 'lib/fluent/plugin/buf_file.rb', line 120
def resume
maps = []
queues = []
Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path|
match = path[@buffer_path_prefix.length..-(@buffer_path_suffix.length+1)]
if m = PATH_MATCH.match(match)
key = decode_key(m[1])
bq = m[2]
tsuffix = m[3]
timestamp = m[3].to_i(16)
unique_id = tsuffix_to_unique_id(tsuffix)
if bq == 'b'
chunk = FileBufferChunk.new(key, path, unique_id, "a+")
maps << [timestamp, chunk]
elsif bq == 'q'
chunk = FileBufferChunk.new(key, path, unique_id, "r")
queues << [timestamp, chunk]
end
end
}
map = {}
maps.sort_by {|(timestamp,chunk)|
timestamp
}.each {|(timestamp,chunk)|
map[chunk.key] = chunk
}
queue = queues.sort_by {|(timestamp,chunk)|
timestamp
}.map {|(timestamp,chunk)|
chunk
}
return queue, map
end
|
#start ⇒ Object
106
107
108
109
|
# File 'lib/fluent/plugin/buf_file.rb', line 106
def start
FileUtils.mkdir_p File.dirname(@buffer_path_prefix+"path")
super
end
|