Class: Fluent::Plugin::TailInput::TailWatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_tail.rb

Defined Under Namespace

Classes: FIFO, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher

Returns a new instance of TailWatcher.



852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
# File 'lib/fluent/plugin/in_tail.rb', line 852

def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics)
  @path = target_info.path
  @ino = target_info.ino
  @pe = pe || MemoryPositionEntry.new
  @read_from_head = read_from_head
  @follow_inodes = follow_inodes
  @update_watcher = update_watcher
  @log = log
  @rotate_handler = RotateHandler.new(log, &method(:on_rotate))
  @line_buffer_timer_flusher = line_buffer_timer_flusher
  @io_handler = nil
  @io_handler_build = io_handler_build
  @metrics = metrics
  @watchers = []
end

Instance Attribute Details

#group_watcherObject

Returns the value of attribute group_watcher.



873
874
875
# File 'lib/fluent/plugin/in_tail.rb', line 873

def group_watcher
  @group_watcher
end

#inoObject (readonly)

Returns the value of attribute ino.



868
869
870
# File 'lib/fluent/plugin/in_tail.rb', line 868

def ino
  @ino
end

#line_buffer_timer_flusherObject (readonly)

Returns the value of attribute line_buffer_timer_flusher.



870
871
872
# File 'lib/fluent/plugin/in_tail.rb', line 870

def line_buffer_timer_flusher
  @line_buffer_timer_flusher
end

#pathObject (readonly)

Returns the value of attribute path.



868
869
870
# File 'lib/fluent/plugin/in_tail.rb', line 868

def path
  @path
end

#peObject (readonly)

Returns the value of attribute pe.



869
870
871
# File 'lib/fluent/plugin/in_tail.rb', line 869

def pe
  @pe
end

#unwatchedObject

This is used for removing position entry from PositionFile



871
872
873
# File 'lib/fluent/plugin/in_tail.rb', line 871

def unwatched
  @unwatched
end

#watchersObject (readonly)

Returns the value of attribute watchers.



872
873
874
# File 'lib/fluent/plugin/in_tail.rb', line 872

def watchers
  @watchers
end

Instance Method Details

#closeObject



891
892
893
894
895
896
# File 'lib/fluent/plugin/in_tail.rb', line 891

def close
  if @io_handler
    @io_handler.close
    @io_handler = nil
  end
end

#detach(shutdown_start_time = nil) ⇒ Object



883
884
885
886
887
888
889
# File 'lib/fluent/plugin/in_tail.rb', line 883

def detach(shutdown_start_time = nil)
  if @io_handler
    @io_handler.ready_to_shutdown(shutdown_start_time)
    @io_handler.on_notify
  end
  @line_buffer_timer_flusher&.close(self)
end

#eof?Boolean

Returns:

  • (Boolean)


898
899
900
# File 'lib/fluent/plugin/in_tail.rb', line 898

def eof?
  @io_handler.nil? || @io_handler.eof?
end

#io_handlerObject



996
997
998
# File 'lib/fluent/plugin/in_tail.rb', line 996

def io_handler
  @io_handler_build.call(self, @path)
end

#on_notifyObject



902
903
904
905
906
907
908
909
910
911
912
913
# File 'lib/fluent/plugin/in_tail.rb', line 902

def on_notify
  begin
    stat = Fluent::FileWrapper.stat(@path)
  rescue Errno::ENOENT, Errno::EACCES
    # moved or deleted
    stat = nil
  end

  @rotate_handler.on_notify(stat) if @rotate_handler
  @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher
  @io_handler.on_notify if @io_handler
end

#on_rotate(stat) ⇒ Object



915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
# File 'lib/fluent/plugin/in_tail.rb', line 915

def on_rotate(stat)
  if @io_handler.nil?
    if stat
      # first time
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case of a and b, seek to the saved position
        #   c) file was once renamed, truncated and then backed
        # in this case, consider it truncated
        @pe.update(inode, 0) if fsize < @pe.read_pos
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        @pe.update(inode, 0)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = @read_from_head ? 0 : fsize
        @pe.update(inode, pos)
      end
      @io_handler = io_handler
    else
      @io_handler = NullIOHandler.new
    end
  else
    watcher_needs_update = false

    if stat
      inode = stat.ino
      if inode == @pe.read_inode # truncated
        @pe.update_pos(0)
        @io_handler.close
      elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher
        @pe.update(inode, 0)
      else # file is rotated and new file found
        watcher_needs_update = true
        # Handle the old log file before renewing TailWatcher [fluentd#1055]
        @io_handler.on_notify
      end
    else # file is rotated and new file not found
      # Clear RotateHandler to avoid duplicated file watch in same path.
      @rotate_handler = nil
      watcher_needs_update = true
    end

    if watcher_needs_update
      if @follow_inodes
        # If stat is nil (file not present), NEED to stop and discard this watcher.
        #   When the file is disappeared but is resurrected soon, then `#refresh_watcher`
        #   can't recognize this TailWatcher needs to be stopped.
        #   This can happens when the file is rotated.
        #   If a notify comes before the new file for the path is created during rotation,
        #   then it appears as if the file was resurrected once it disappeared.
        # Don't want to swap state because we need latest read offset in pos file even after rotate_wait
        @update_watcher.call(self, @pe, stat&.ino)
      else
        # Permit to handle if stat is nil (file not present).
        # If a file is mv-ed and a new file is created during
        # calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers`
        # and `#stop_watchers()` for the path because `target_paths_hash`
        # always contains the path.
        @update_watcher.call(self, swap_state(@pe), stat&.ino)
      end
    else
      @log.info "detected rotation of #{@path}"
      @io_handler = io_handler
    end
    @metrics.rotated.inc
  end
end

#register_watcher(watcher) ⇒ Object



879
880
881
# File 'lib/fluent/plugin/in_tail.rb', line 879

def register_watcher(watcher)
  @watchers << watcher
end

#swap_state(pe) ⇒ Object



1000
1001
1002
1003
1004
1005
1006
# File 'lib/fluent/plugin/in_tail.rb', line 1000

def swap_state(pe)
  # Use MemoryPositionEntry for rotated file temporary
  mpe = MemoryPositionEntry.new
  mpe.update(pe.read_inode, pe.read_pos)
  @pe = mpe
  pe # This pe will be updated in on_rotate after TailWatcher is initialized
end

#tagObject



875
876
877
# File 'lib/fluent/plugin/in_tail.rb', line 875

def tag
  @parsed_tag ||= @path.tr('/', '.').squeeze('.').gsub(/^\./, '')
end