Class: Fluent::FileBuffer

Inherits:
BasicBuffer show all
Defined in:
lib/fluent/plugin/buf_file.rb

Direct Known Subclasses

ZFileBuffer

Constant Summary collapse

PATH_MATCH =
/^(.*)[\._](b|q)([0-9a-fA-F]{1,32})$/

Instance Attribute Summary

Attributes included from Configurable

#config

Instance Method Summary collapse

Methods inherited from BasicBuffer

#clear!, #emit, #enable_parallel, #keys, #pop, #push, #queue_size, #shutdown, #total_queued_chunk_size, #write_chunk

Methods inherited from Buffer

#shutdown

Methods included from Configurable

included

Constructor Details

#initializeFileBuffer

Returns a new instance of FileBuffer.



81
82
83
84
# File 'lib/fluent/plugin/buf_file.rb', line 81

def initialize
  require 'uri'
  super
end

Instance Method Details

#before_shutdown(out) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
# File 'lib/fluent/plugin/buf_file.rb', line 171

def before_shutdown(out)
  if @flush_at_shutdown
    synchronize do
      @map.each_key {|key|
        push(key)
      }
      while pop(out)
      end
    end
  end
end

#configure(conf) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/fluent/plugin/buf_file.rb', line 88

def configure(conf)
  super

  if pos = @buffer_path.index('*')
    @buffer_path_prefix = @buffer_path[0,pos]
    @buffer_path_suffix = @buffer_path[pos+1..-1]
  else
    @buffer_path_prefix = @buffer_path+"."
    @buffer_path_suffix = ".log"
  end

  if flush_at_shutdown = conf['flush_at_shutdown']
    @flush_at_shutdown = true
  else
    @flush_at_shutdown = false
  end
end

#enqueue(chunk) ⇒ Object



159
160
161
162
163
164
165
166
167
168
169
# File 'lib/fluent/plugin/buf_file.rb', line 159

def enqueue(chunk)
  path = chunk.path
  mp = path[@buffer_path_prefix.length..-(@buffer_path_suffix.length+1)]

  m = PATH_MATCH.match(mp)
  encoded_key = m ? m[1] : ""
  tsuffix = m[3]
  npath = "#{@buffer_path_prefix}#{encoded_key}.q#{tsuffix}#{@buffer_path_suffix}"

  chunk.mv(npath)
end

#new_chunk(key) ⇒ Object



113
114
115
116
117
118
# File 'lib/fluent/plugin/buf_file.rb', line 113

def new_chunk(key)
  encoded_key = encode_key(key)
  path, tsuffix = make_path(encoded_key, "b")
  unique_id = tsuffix_to_unique_id(tsuffix)
  FileBufferChunk.new(key, path, unique_id)
end

#resumeObject



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/fluent/plugin/buf_file.rb', line 120

def resume
  maps = []
  queues = []

  Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path|
    match = path[@buffer_path_prefix.length..-(@buffer_path_suffix.length+1)]
    if m = PATH_MATCH.match(match)
      key = decode_key(m[1])
      bq = m[2]
      tsuffix = m[3]
      timestamp = m[3].to_i(16)
      unique_id = tsuffix_to_unique_id(tsuffix)

      if bq == 'b'
        chunk = FileBufferChunk.new(key, path, unique_id, "a+")
        maps << [timestamp, chunk]
      elsif bq == 'q'
        chunk = FileBufferChunk.new(key, path, unique_id, "r")
        queues << [timestamp, chunk]
      end
    end
  }

  map = {}
  maps.sort_by {|(timestamp,chunk)|
    timestamp
  }.each {|(timestamp,chunk)|
    map[chunk.key] = chunk
  }

  queue = queues.sort_by {|(timestamp,chunk)|
    timestamp
  }.map {|(timestamp,chunk)|
    chunk
  }

  return queue, map
end

#startObject



106
107
108
109
# File 'lib/fluent/plugin/buf_file.rb', line 106

def start
  FileUtils.mkdir_p File.dirname(@buffer_path_prefix+"path")
  super
end