Class: Fluent::PullPoolBuffer
- Inherits:
-
FileBuffer
- Object
- FileBuffer
- Fluent::PullPoolBuffer
- Defined in:
- lib/fluent/plugin/buf_pullpool.rb
Instance Method Summary collapse
-
#initialize ⇒ PullPoolBuffer
constructor
A new instance of PullPoolBuffer.
-
#new_chunk(key) ⇒ Object
copy&paste from FileBuffer, but s/FileBufferChunk/PullBufferChunk/.
- #pull_chunks ⇒ Object
-
#resume ⇒ Object
copy&paste from FileBuffer, but s/FileBufferChunk/PullBufferChunk/.
- #write_chunk(chunk, out) ⇒ Object
Constructor Details
#initialize ⇒ PullPoolBuffer
Returns a new instance of PullPoolBuffer.
28 29 30 31 32 |
# File 'lib/fluent/plugin/buf_pullpool.rb', line 28 def initialize super @mutex = Mutex.new @pool = [] end |
Instance Method Details
#new_chunk(key) ⇒ Object
copy&paste from FileBuffer, but s/FileBufferChunk/PullBufferChunk/
61 62 63 64 65 66 |
# File 'lib/fluent/plugin/buf_pullpool.rb', line 61 def new_chunk(key) # copy&paste from FileBuffer, but s/FileBufferChunk/PullBufferChunk/ encoded_key = encode_key(key) path, tsuffix = make_path(encoded_key, "b") unique_id = tsuffix_to_unique_id(tsuffix) PullBufferChunk.new(key, path, unique_id, "a+", @symlink_path) end |
#pull_chunks ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/fluent/plugin/buf_pullpool.rb', line 43 def pull_chunks raise "BUG: block not given" unless block_given? chunks = nil @mutex.synchronize do chunks = @pool @pool = [] end chunks.each do |chunk| begin yield chunk chunk.actual_purge rescue => e # TODO: Chunks should be returned to @pool ? # It may make infinite error loops. Hmmm.... end end end |
#resume ⇒ Object
copy&paste from FileBuffer, but s/FileBufferChunk/PullBufferChunk/
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/fluent/plugin/buf_pullpool.rb', line 68 def resume # copy&paste from FileBuffer, but s/FileBufferChunk/PullBufferChunk/ maps = [] queues = [] Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path| match = path[@buffer_path_prefix.length..-(@buffer_path_suffix.length+1)] if m = PATH_MATCH.match(match) key = decode_key(m[1]) bq = m[2] tsuffix = m[3] = m[3].to_i(16) unique_id = tsuffix_to_unique_id(tsuffix) if bq == 'b' chunk = PullBufferChunk.new(key, path, unique_id, "a+") maps << [, chunk] elsif bq == 'q' chunk = PullBufferChunk.new(key, path, unique_id, "r") queues << [, chunk] end end } map = {} maps.sort_by {|(,chunk)| }.each {|(,chunk)| map[chunk.key] = chunk } queue = queues.sort_by {|(,chunk)| }.map {|(,chunk)| chunk } return queue, map end |
#write_chunk(chunk, out) ⇒ Object
34 35 36 37 38 39 40 41 |
# File 'lib/fluent/plugin/buf_pullpool.rb', line 34 def write_chunk(chunk, out) if out.respond_to?(:write) # for normal BufferedOutput plugins: works just same as FileBuffer out.write(chunk) chunk.flushed = true else @pool.push(chunk) end end |