Class: Fluent::TailExAsisInput

Inherits:
TailInput
  • Object
show all
Includes:
Configurable, Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/in_tail_ex_asis.rb

Defined Under Namespace

Classes: AsisParser, TailExAsisWatcher

Instance Method Summary collapse

Constructor Details

#initializeTailExAsisInput

Returns a new instance of TailExAsisInput.



15
16
17
18
19
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 15

def initialize
  super
  @ready = false
  @parser = nil
end

Instance Method Details

#configure(conf) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 36

def configure(conf)
  super
  if @tag.index('*')
    @tag_prefix, @tag_suffix = @tag.split('*')
    @tag_suffix ||= ''
  else
    @tag_prefix = nil
    @tag_suffix = nil
  end
  @watchers = {}
  @refresh_trigger = TailWatcher::TimerWatcher.new(@refresh_interval, true, &method(:refresh_watchers))
end

#configure_parser(conf) ⇒ Object



31
32
33
34
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 31

def configure_parser(conf)
  @parser = TailExAsisInput::AsisParser.new
  @parser.configure(conf)
end

#expand_pathsObject



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 49

def expand_paths
  date = Time.now
  paths = []
  for path in @paths
    if @expand_date
      path = date.strftime(path)
    end
    paths += Dir.glob(path)
  end
  paths
end

#receive_lines(lines, tag) ⇒ Object



97
98
99
100
101
102
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 97

def receive_lines(lines, tag)
  if @tag_prefix || @tag_suffix
    @tag = @tag_prefix + tag + @tag_suffix
  end
  super(lines)
end

#refresh_watchersObject



61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 61

def refresh_watchers
  paths = expand_paths
  missing = @watchers.keys - paths
  added = paths - @watchers.keys

  stop_watch(missing) unless missing.empty?
  start_watch(added) unless added.empty?
end

#runObject



123
124
125
126
127
128
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 123

def run
  # don't run unless ready to avoid coolio error
  if @ready
    super
  end
end

#shutdownObject



115
116
117
118
119
120
121
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 115

def shutdown
  @refresh_trigger.detach
  @loop.stop
  stop_watch(@watchers.keys, true)
  @thread.join
  @pf_file.close if @pf_file
end

#startObject



104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 104

def start
  paths, @paths = @paths, []
  super
  @thread.join
  @paths = paths
  refresh_watchers
  @refresh_trigger.attach(@loop)
  @ready = true
  @thread = Thread.new(&method(:run))
end

#start_watch(paths) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 70

def start_watch(paths)
  paths.each do |path|
    if @pf
      pe = @pf[path]
      if @read_all && pe.read_inode == 0
        inode = File::Stat.new(path).ino
        pe.update(inode, 0)
      end
    else
      pe = nil
    end

    watcher = TailExAsisWatcher.new(path, @rotate_wait, pe, &method(:receive_lines))
    watcher.attach(@loop)
    @watchers[path] = watcher
  end
end

#stop_watch(paths, immediate = false) ⇒ Object



88
89
90
91
92
93
94
95
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 88

def stop_watch(paths, immediate=false)
  paths.each do |path|
    watcher = @watchers.delete(path)
    if watcher
      watcher.close(immediate ? nil : @loop)
    end
  end
end