Class: Fluent::TailExAsisInput

Inherits:
TailInput
  • Object
show all
Includes:
Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/in_tail_ex_asis.rb

Defined Under Namespace

Classes: AsisParser, TailExAsisWatcher

Instance Method Summary collapse

Constructor Details

#initializeTailExAsisInput

Returns a new instance of TailExAsisInput.



14
15
16
17
18
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 14

def initialize
  super
  @ready = false
  @parser = nil
end

Instance Method Details

#configure(conf) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 25

def configure(conf)
  super
  if @tag.index('*')
    @tag_prefix, @tag_suffix = @tag.split('*')
    @tag_suffix ||= ''
  else
    @tag_prefix = nil
    @tag_suffix = nil
  end
  @watchers = {}
  @refresh_trigger = TailWatcher::TimerWatcher.new(@refresh_interval, true, &method(:refresh_watchers))
end

#configure_parser(conf) ⇒ Object



20
21
22
23
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 20

def configure_parser(conf)
  @parser = AsisParser.new
  @parser.configure(conf)
end

#expand_pathsObject



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 38

def expand_paths
  date = Time.now
  paths = []
  for path in @paths
    if @expand_date
      path = date.strftime(path)
    end
    paths += Dir.glob(path)
  end
  paths
end

#receive_lines(lines, tag) ⇒ Object



86
87
88
89
90
91
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 86

def receive_lines(lines, tag)
  if @tag_prefix || @tag_suffix
    @tag = @tag_prefix + tag + @tag_suffix
  end
  super(lines)
end

#refresh_watchersObject



50
51
52
53
54
55
56
57
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 50

def refresh_watchers
  paths = expand_paths
  missing = @watchers.keys - paths
  added = paths - @watchers.keys

  stop_watch(missing) unless missing.empty?
  start_watch(added) unless added.empty?
end

#runObject



112
113
114
115
116
117
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 112

def run
  # don't run unless ready to avoid coolio error
  if @ready
    super
  end
end

#shutdownObject



104
105
106
107
108
109
110
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 104

def shutdown
  @refresh_trigger.detach
  stop_watch(@watchers.keys, true)
  @loop.stop
  @thread.join
  @pf_file.close if @pf_file
end

#startObject



93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 93

def start
  paths, @paths = @paths, []
  super
  @thread.join
  @paths = paths
  refresh_watchers
  @refresh_trigger.attach(@loop)
  @ready = true
  @thread = Thread.new(&method(:run))
end

#start_watch(paths) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 59

def start_watch(paths)
  paths.each do |path|
    if @pf
      pe = @pf[path]
      if @read_all && pe.read_inode == 0
        inode = File::Stat.new(path).ino
        pe.update(inode, 0)
      end
    else
      pe = nil
    end

    watcher = TailExAsisWatcher.new(path, @rotate_wait, pe, &method(:receive_lines))
    watcher.attach(@loop)
    @watchers[path] = watcher
  end
end

#stop_watch(paths, immediate = false) ⇒ Object



77
78
79
80
81
82
83
84
# File 'lib/fluent/plugin/in_tail_ex_asis.rb', line 77

def stop_watch(paths, immediate=false)
  paths.each do |path|
    watcher = @watchers.delete(path)
    if watcher
      watcher.close(immediate ? nil : @loop)
    end
  end
end