Class: Fluent::Plugin::TailInput::PositionFile

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_tail/position_file.rb

Constant Summary collapse

UNWATCHED_POSITION =
0xffffffffffffffff
POSITION_FILE_ENTRY_REGEX =
/^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(file, follow_inodes, logger: nil) ⇒ PositionFile

Returns a new instance of PositionFile.



31
32
33
34
35
36
37
# File 'lib/fluent/plugin/in_tail/position_file.rb', line 31

def initialize(file, follow_inodes, logger: nil)
  @file = file
  @logger = logger
  @file_mutex = Mutex.new
  @map = {}
  @follow_inodes = follow_inodes
end

Class Method Details

.load(file, follow_inodes, existing_targets, logger:) ⇒ Object



25
26
27
28
29
# File 'lib/fluent/plugin/in_tail/position_file.rb', line 25

def self.load(file, follow_inodes, existing_targets, logger:)
  pf = new(file, follow_inodes, logger: logger)
  pf.load(existing_targets)
  pf
end

Instance Method Details

#[](target_info) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/fluent/plugin/in_tail/position_file.rb', line 39

def [](target_info)
  if m = @map[@follow_inodes ? target_info.ino : target_info.path]
    return m
  end

  @file_mutex.synchronize {
    @file.seek(0, IO::SEEK_END)
    seek = @file.pos + target_info.path.bytesize + 1
    @file.write "#{target_info.path}\t0000000000000000\t0000000000000000\n"
    if @follow_inodes
      @map[target_info.ino] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
    else
      @map[target_info.path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
    end
  }
end

#load(existing_targets = nil) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/fluent/plugin/in_tail/position_file.rb', line 62

def load(existing_targets = nil)
  compact(existing_targets)

  map = {}
  @file_mutex.synchronize do
    @file.pos = 0

    @file.each_line do |line|
      m = POSITION_FILE_ENTRY_REGEX.match(line)
      next if m.nil?

      path = m[1]
      pos = m[2].to_i(16)
      ino = m[3].to_i(16)
      seek = @file.pos - line.bytesize + path.bytesize + 1
      if @follow_inodes
        map[ino] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
      else
        map[path] = FilePositionEntry.new(@file, @file_mutex, seek, pos, ino)
      end
    end
  end

  @map = map
end

#try_compactObject

This method is similer to #compact but it tries to get less lock to avoid a lock contention



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/fluent/plugin/in_tail/position_file.rb', line 89

def try_compact
  last_modified = nil
  size = nil

  @file_mutex.synchronize do
    size = @file.size
    last_modified = @file.mtime
  end

  entries = fetch_compacted_entries

  @file_mutex.synchronize do
    if last_modified == @file.mtime && size == @file.size
      @file.pos = 0
      @file.truncate(0)
      @file.write(entries.values.map(&:to_entry_fmt).join)

      # entry contains path/ino key and value.
      entries.each do |key, val|
        if (m = @map[key])
          m.seek = val.seek
        end
      end
    else
      # skip
    end
  end
end

#unwatch(target_info) ⇒ Object



56
57
58
59
60
# File 'lib/fluent/plugin/in_tail/position_file.rb', line 56

def unwatch(target_info)
  if (entry = @map.delete(@follow_inodes ? target_info.ino : target_info.path))
    entry.update_pos(UNWATCHED_POSITION)
  end
end