Class: Fluent::Plugin::TailInput::TailWatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_tail.rb

Defined Under Namespace

Classes: FIFO, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher

Returns a new instance of TailWatcher.



791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
# File 'lib/fluent/plugin/in_tail.rb', line 791

def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics)
  @path = target_info.path
  @ino = target_info.ino
  @pe = pe || MemoryPositionEntry.new
  @read_from_head = read_from_head
  @follow_inodes = follow_inodes
  @update_watcher = update_watcher
  @log = log
  @rotate_handler = RotateHandler.new(log, &method(:on_rotate))
  @line_buffer_timer_flusher = line_buffer_timer_flusher
  @io_handler = nil
  @io_handler_build = io_handler_build
  @metrics = metrics
  @watchers = []
end

Instance Attribute Details

#group_watcherObject

Returns the value of attribute group_watcher.



812
813
814
# File 'lib/fluent/plugin/in_tail.rb', line 812

def group_watcher
  @group_watcher
end

#inoObject (readonly)

Returns the value of attribute ino.



807
808
809
# File 'lib/fluent/plugin/in_tail.rb', line 807

def ino
  @ino
end

#line_buffer_timer_flusherObject (readonly)

Returns the value of attribute line_buffer_timer_flusher.



809
810
811
# File 'lib/fluent/plugin/in_tail.rb', line 809

def line_buffer_timer_flusher
  @line_buffer_timer_flusher
end

#pathObject (readonly)

Returns the value of attribute path.



807
808
809
# File 'lib/fluent/plugin/in_tail.rb', line 807

def path
  @path
end

#peObject (readonly)

Returns the value of attribute pe.



808
809
810
# File 'lib/fluent/plugin/in_tail.rb', line 808

def pe
  @pe
end

#unwatchedObject

This is used for removing position entry from PositionFile



810
811
812
# File 'lib/fluent/plugin/in_tail.rb', line 810

def unwatched
  @unwatched
end

#watchersObject (readonly)

Returns the value of attribute watchers.



811
812
813
# File 'lib/fluent/plugin/in_tail.rb', line 811

def watchers
  @watchers
end

Instance Method Details

#closeObject



830
831
832
833
834
835
# File 'lib/fluent/plugin/in_tail.rb', line 830

def close
  if @io_handler
    @io_handler.close
    @io_handler = nil
  end
end

#detach(shutdown_start_time = nil) ⇒ Object



822
823
824
825
826
827
828
# File 'lib/fluent/plugin/in_tail.rb', line 822

def detach(shutdown_start_time = nil)
  if @io_handler
    @io_handler.ready_to_shutdown(shutdown_start_time)
    @io_handler.on_notify
  end
  @line_buffer_timer_flusher&.close(self)
end

#eof?Boolean

Returns:

  • (Boolean)


837
838
839
# File 'lib/fluent/plugin/in_tail.rb', line 837

def eof?
  @io_handler.nil? || @io_handler.eof?
end

#io_handlerObject



935
936
937
# File 'lib/fluent/plugin/in_tail.rb', line 935

def io_handler
  @io_handler_build.call(self, @path)
end

#on_notifyObject



841
842
843
844
845
846
847
848
849
850
851
852
# File 'lib/fluent/plugin/in_tail.rb', line 841

def on_notify
  begin
    stat = Fluent::FileWrapper.stat(@path)
  rescue Errno::ENOENT, Errno::EACCES
    # moved or deleted
    stat = nil
  end

  @rotate_handler.on_notify(stat) if @rotate_handler
  @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher
  @io_handler.on_notify if @io_handler
end

#on_rotate(stat) ⇒ Object



854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
# File 'lib/fluent/plugin/in_tail.rb', line 854

def on_rotate(stat)
  if @io_handler.nil?
    if stat
      # first time
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case of a and b, seek to the saved position
        #   c) file was once renamed, truncated and then backed
        # in this case, consider it truncated
        @pe.update(inode, 0) if fsize < @pe.read_pos
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        @pe.update(inode, 0)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = @read_from_head ? 0 : fsize
        @pe.update(inode, pos)
      end
      @io_handler = io_handler
    else
      @io_handler = NullIOHandler.new
    end
  else
    watcher_needs_update = false

    if stat
      inode = stat.ino
      if inode == @pe.read_inode # truncated
        @pe.update_pos(0)
        @io_handler.close
      elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher
        @pe.update(inode, 0)
      else # file is rotated and new file found
        watcher_needs_update = true
        # Handle the old log file before renewing TailWatcher [fluentd#1055]
        @io_handler.on_notify
      end
    else # file is rotated and new file not found
      # Clear RotateHandler to avoid duplicated file watch in same path.
      @rotate_handler = nil
      watcher_needs_update = true
    end

    if watcher_needs_update
      if @follow_inodes
        # If stat is nil (file not present), NEED to stop and discard this watcher.
        #   When the file is disappeared but is resurrected soon, then `#refresh_watcher`
        #   can't recognize this TailWatcher needs to be stopped.
        #   This can happens when the file is rotated.
        #   If a notify comes before the new file for the path is created during rotation,
        #   then it appears as if the file was resurrected once it disappeared.
        # Don't want to swap state because we need latest read offset in pos file even after rotate_wait
        @update_watcher.call(self, @pe, stat&.ino)
      else
        # Permit to handle if stat is nil (file not present).
        # If a file is mv-ed and a new file is created during
        # calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers`
        # and `#stop_watchers()` for the path because `target_paths_hash`
        # always contains the path.
        @update_watcher.call(self, swap_state(@pe), stat&.ino)
      end
    else
      @log.info "detected rotation of #{@path}"
      @io_handler = io_handler
    end
    @metrics.rotated.inc
  end
end

#register_watcher(watcher) ⇒ Object



818
819
820
# File 'lib/fluent/plugin/in_tail.rb', line 818

def register_watcher(watcher)
  @watchers << watcher
end

#swap_state(pe) ⇒ Object



939
940
941
942
943
944
945
# File 'lib/fluent/plugin/in_tail.rb', line 939

def swap_state(pe)
  # Use MemoryPositionEntry for rotated file temporary
  mpe = MemoryPositionEntry.new
  mpe.update(pe.read_inode, pe.read_pos)
  @pe = mpe
  pe # This pe will be updated in on_rotate after TailWatcher is initialized
end

#tagObject



814
815
816
# File 'lib/fluent/plugin/in_tail.rb', line 814

def tag
  @parsed_tag ||= @path.tr('/', '.').squeeze('.').gsub(/^\./, '')
end