Class: Fluent::Plugin::TailInput::TailWatcher
- Inherits:
-
Object
- Object
- Fluent::Plugin::TailInput::TailWatcher
- Defined in:
- lib/fluent/plugin/in_tail.rb
Defined Under Namespace
Classes: FIFO, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler
Instance Attribute Summary collapse
-
#group_watcher ⇒ Object
Returns the value of attribute group_watcher.
-
#ino ⇒ Object
readonly
Returns the value of attribute ino.
-
#line_buffer_timer_flusher ⇒ Object
readonly
Returns the value of attribute line_buffer_timer_flusher.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
-
#pe ⇒ Object
readonly
Returns the value of attribute pe.
-
#unwatched ⇒ Object
This is used for removing position entry from PositionFile.
-
#watchers ⇒ Object
readonly
Returns the value of attribute watchers.
Instance Method Summary collapse
- #close ⇒ Object
- #detach(shutdown_start_time = nil) ⇒ Object
- #eof? ⇒ Boolean
-
#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher
constructor
A new instance of TailWatcher.
- #io_handler ⇒ Object
- #on_notify ⇒ Object
- #on_rotate(stat) ⇒ Object
- #register_watcher(watcher) ⇒ Object
- #swap_state(pe) ⇒ Object
- #tag ⇒ Object
Constructor Details
#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher
Returns a new instance of TailWatcher.
852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 |
# File 'lib/fluent/plugin/in_tail.rb', line 852 def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) @path = target_info.path @ino = target_info.ino @pe = pe || MemoryPositionEntry.new @read_from_head = read_from_head @follow_inodes = follow_inodes @update_watcher = update_watcher @log = log @rotate_handler = RotateHandler.new(log, &method(:on_rotate)) @line_buffer_timer_flusher = line_buffer_timer_flusher @io_handler = nil @io_handler_build = io_handler_build @metrics = metrics @watchers = [] end |
Instance Attribute Details
#group_watcher ⇒ Object
Returns the value of attribute group_watcher.
873 874 875 |
# File 'lib/fluent/plugin/in_tail.rb', line 873 def group_watcher @group_watcher end |
#ino ⇒ Object (readonly)
Returns the value of attribute ino.
868 869 870 |
# File 'lib/fluent/plugin/in_tail.rb', line 868 def ino @ino end |
#line_buffer_timer_flusher ⇒ Object (readonly)
Returns the value of attribute line_buffer_timer_flusher.
870 871 872 |
# File 'lib/fluent/plugin/in_tail.rb', line 870 def line_buffer_timer_flusher @line_buffer_timer_flusher end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
868 869 870 |
# File 'lib/fluent/plugin/in_tail.rb', line 868 def path @path end |
#pe ⇒ Object (readonly)
Returns the value of attribute pe.
869 870 871 |
# File 'lib/fluent/plugin/in_tail.rb', line 869 def pe @pe end |
#unwatched ⇒ Object
This is used for removing position entry from PositionFile
871 872 873 |
# File 'lib/fluent/plugin/in_tail.rb', line 871 def unwatched @unwatched end |
#watchers ⇒ Object (readonly)
Returns the value of attribute watchers.
872 873 874 |
# File 'lib/fluent/plugin/in_tail.rb', line 872 def watchers @watchers end |
Instance Method Details
#close ⇒ Object
891 892 893 894 895 896 |
# File 'lib/fluent/plugin/in_tail.rb', line 891 def close if @io_handler @io_handler.close @io_handler = nil end end |
#detach(shutdown_start_time = nil) ⇒ Object
883 884 885 886 887 888 889 |
# File 'lib/fluent/plugin/in_tail.rb', line 883 def detach(shutdown_start_time = nil) if @io_handler @io_handler.ready_to_shutdown(shutdown_start_time) @io_handler.on_notify end @line_buffer_timer_flusher&.close(self) end |
#eof? ⇒ Boolean
898 899 900 |
# File 'lib/fluent/plugin/in_tail.rb', line 898 def eof? @io_handler.nil? || @io_handler.eof? end |
#io_handler ⇒ Object
996 997 998 |
# File 'lib/fluent/plugin/in_tail.rb', line 996 def io_handler @io_handler_build.call(self, @path) end |
#on_notify ⇒ Object
902 903 904 905 906 907 908 909 910 911 912 913 |
# File 'lib/fluent/plugin/in_tail.rb', line 902 def on_notify begin stat = Fluent::FileWrapper.stat(@path) rescue Errno::ENOENT, Errno::EACCES # moved or deleted stat = nil end @rotate_handler.on_notify(stat) if @rotate_handler @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher @io_handler.on_notify if @io_handler end |
#on_rotate(stat) ⇒ Object
915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 |
# File 'lib/fluent/plugin/in_tail.rb', line 915 def on_rotate(stat) if @io_handler.nil? if stat # first time fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case of a and b, seek to the saved position # c) file was once renamed, truncated and then backed # in this case, consider it truncated @pe.update(inode, 0) if fsize < @pe.read_pos elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. @pe.update(inode, 0) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end @io_handler = io_handler else @io_handler = NullIOHandler.new end else watcher_needs_update = false if stat inode = stat.ino if inode == @pe.read_inode # truncated @pe.update_pos(0) @io_handler.close elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher @pe.update(inode, 0) else # file is rotated and new file found watcher_needs_update = true # Handle the old log file before renewing TailWatcher [fluentd#1055] @io_handler.on_notify end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil watcher_needs_update = true end if watcher_needs_update if @follow_inodes # If stat is nil (file not present), NEED to stop and discard this watcher. # When the file is disappeared but is resurrected soon, then `#refresh_watcher` # can't recognize this TailWatcher needs to be stopped. # This can happens when the file is rotated. # If a notify comes before the new file for the path is created during rotation, # then it appears as if the file was resurrected once it disappeared. # Don't want to swap state because we need latest read offset in pos file even after rotate_wait @update_watcher.call(self, @pe, stat&.ino) else # Permit to handle if stat is nil (file not present). # If a file is mv-ed and a new file is created during # calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers` # and `#stop_watchers()` for the path because `target_paths_hash` # always contains the path. @update_watcher.call(self, swap_state(@pe), stat&.ino) end else @log.info "detected rotation of #{@path}" @io_handler = io_handler end @metrics.rotated.inc end end |
#register_watcher(watcher) ⇒ Object
879 880 881 |
# File 'lib/fluent/plugin/in_tail.rb', line 879 def register_watcher(watcher) @watchers << watcher end |
#swap_state(pe) ⇒ Object
1000 1001 1002 1003 1004 1005 1006 |
# File 'lib/fluent/plugin/in_tail.rb', line 1000 def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe pe # This pe will be updated in on_rotate after TailWatcher is initialized end |
#tag ⇒ Object
875 876 877 |
# File 'lib/fluent/plugin/in_tail.rb', line 875 def tag @parsed_tag ||= @path.tr('/', '.').squeeze('.').gsub(/^\./, '') end |