Class: Fluent::PullPoolBuffer

Inherits:
FileBuffer
  • Object
show all
Defined in:
lib/fluent/plugin/buf_pullpool.rb

Instance Method Summary collapse

Constructor Details

#initializePullPoolBuffer

Returns a new instance of PullPoolBuffer.



28
29
30
31
32
# File 'lib/fluent/plugin/buf_pullpool.rb', line 28

def initialize
  super
  @mutex = Mutex.new
  @pool = []
end

Instance Method Details

#new_chunk(key) ⇒ Object

copy&paste from FileBuffer, but s/FileBufferChunk/PullBufferChunk/



61
62
63
64
65
66
# File 'lib/fluent/plugin/buf_pullpool.rb', line 61

def new_chunk(key) # copy&paste from FileBuffer, but s/FileBufferChunk/PullBufferChunk/
  encoded_key = encode_key(key)
  path, tsuffix = make_path(encoded_key, "b")
  unique_id = tsuffix_to_unique_id(tsuffix)
  PullBufferChunk.new(key, path, unique_id, "a+", @symlink_path)
end

#pull_chunksObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/buf_pullpool.rb', line 43

def pull_chunks
  raise "BUG: block not given" unless block_given?
  chunks = nil
  @mutex.synchronize do
    chunks = @pool
    @pool = []
  end
  chunks.each do |chunk|
    begin
      yield chunk
      chunk.actual_purge
    rescue => e
      # TODO: Chunks should be returned to @pool ?
      #       It may make infinite error loops. Hmmm....
    end
  end
end

#resumeObject

copy&paste from FileBuffer, but s/FileBufferChunk/PullBufferChunk/



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/buf_pullpool.rb', line 68

def resume # copy&paste from FileBuffer, but s/FileBufferChunk/PullBufferChunk/
  maps = []
  queues = []

  Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path|
    match = path[@buffer_path_prefix.length..-(@buffer_path_suffix.length+1)]
    if m = PATH_MATCH.match(match)
      key = decode_key(m[1])
      bq = m[2]
      tsuffix = m[3]
      timestamp = m[3].to_i(16)
      unique_id = tsuffix_to_unique_id(tsuffix)

      if bq == 'b'
        chunk = PullBufferChunk.new(key, path, unique_id, "a+")
        maps << [timestamp, chunk]
      elsif bq == 'q'
        chunk = PullBufferChunk.new(key, path, unique_id, "r")
        queues << [timestamp, chunk]
      end
    end
  }

  map = {}
  maps.sort_by {|(timestamp,chunk)|
    timestamp
  }.each {|(timestamp,chunk)|
    map[chunk.key] = chunk
  }

  queue = queues.sort_by {|(timestamp,chunk)|
    timestamp
  }.map {|(timestamp,chunk)|
    chunk
  }

  return queue, map
end

#write_chunk(chunk, out) ⇒ Object



34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/buf_pullpool.rb', line 34

def write_chunk(chunk, out)
  if out.respond_to?(:write) # for normal BufferedOutput plugins: works just same as FileBuffer
    out.write(chunk)
    chunk.flushed = true
  else
    @pool.push(chunk)
  end
end