Class: Fluent::Plugin::TailInput::TailWatcher
- Inherits:
-
Object
- Object
- Fluent::Plugin::TailInput::TailWatcher
- Defined in:
- lib/fluent/plugin/in_tail.rb
Defined Under Namespace
Classes: FIFO, IOHandler, LineBufferTimerFlusher, NullIOHandler, RotateHandler
Instance Attribute Summary collapse
-
#group_watcher ⇒ Object
Returns the value of attribute group_watcher.
-
#ino ⇒ Object
readonly
Returns the value of attribute ino.
-
#line_buffer_timer_flusher ⇒ Object
readonly
Returns the value of attribute line_buffer_timer_flusher.
-
#path ⇒ Object
readonly
Returns the value of attribute path.
-
#pe ⇒ Object
readonly
Returns the value of attribute pe.
-
#unwatched ⇒ Object
This is used for removing position entry from PositionFile.
-
#watchers ⇒ Object
readonly
Returns the value of attribute watchers.
Instance Method Summary collapse
- #close ⇒ Object
- #detach(shutdown_start_time = nil) ⇒ Object
- #eof? ⇒ Boolean
-
#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher
constructor
A new instance of TailWatcher.
- #io_handler ⇒ Object
- #on_notify ⇒ Object
- #on_rotate(stat) ⇒ Object
- #register_watcher(watcher) ⇒ Object
- #swap_state(pe) ⇒ Object
- #tag ⇒ Object
Constructor Details
#initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) ⇒ TailWatcher
Returns a new instance of TailWatcher.
791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 |
# File 'lib/fluent/plugin/in_tail.rb', line 791 def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) @path = target_info.path @ino = target_info.ino @pe = pe || MemoryPositionEntry.new @read_from_head = read_from_head @follow_inodes = follow_inodes @update_watcher = update_watcher @log = log @rotate_handler = RotateHandler.new(log, &method(:on_rotate)) @line_buffer_timer_flusher = line_buffer_timer_flusher @io_handler = nil @io_handler_build = io_handler_build @metrics = metrics @watchers = [] end |
Instance Attribute Details
#group_watcher ⇒ Object
Returns the value of attribute group_watcher.
812 813 814 |
# File 'lib/fluent/plugin/in_tail.rb', line 812 def group_watcher @group_watcher end |
#ino ⇒ Object (readonly)
Returns the value of attribute ino.
807 808 809 |
# File 'lib/fluent/plugin/in_tail.rb', line 807 def ino @ino end |
#line_buffer_timer_flusher ⇒ Object (readonly)
Returns the value of attribute line_buffer_timer_flusher.
809 810 811 |
# File 'lib/fluent/plugin/in_tail.rb', line 809 def line_buffer_timer_flusher @line_buffer_timer_flusher end |
#path ⇒ Object (readonly)
Returns the value of attribute path.
807 808 809 |
# File 'lib/fluent/plugin/in_tail.rb', line 807 def path @path end |
#pe ⇒ Object (readonly)
Returns the value of attribute pe.
808 809 810 |
# File 'lib/fluent/plugin/in_tail.rb', line 808 def pe @pe end |
#unwatched ⇒ Object
This is used for removing position entry from PositionFile
810 811 812 |
# File 'lib/fluent/plugin/in_tail.rb', line 810 def unwatched @unwatched end |
#watchers ⇒ Object (readonly)
Returns the value of attribute watchers.
811 812 813 |
# File 'lib/fluent/plugin/in_tail.rb', line 811 def watchers @watchers end |
Instance Method Details
#close ⇒ Object
830 831 832 833 834 835 |
# File 'lib/fluent/plugin/in_tail.rb', line 830 def close if @io_handler @io_handler.close @io_handler = nil end end |
#detach(shutdown_start_time = nil) ⇒ Object
822 823 824 825 826 827 828 |
# File 'lib/fluent/plugin/in_tail.rb', line 822 def detach(shutdown_start_time = nil) if @io_handler @io_handler.ready_to_shutdown(shutdown_start_time) @io_handler.on_notify end @line_buffer_timer_flusher&.close(self) end |
#eof? ⇒ Boolean
837 838 839 |
# File 'lib/fluent/plugin/in_tail.rb', line 837 def eof? @io_handler.nil? || @io_handler.eof? end |
#io_handler ⇒ Object
935 936 937 |
# File 'lib/fluent/plugin/in_tail.rb', line 935 def io_handler @io_handler_build.call(self, @path) end |
#on_notify ⇒ Object
841 842 843 844 845 846 847 848 849 850 851 852 |
# File 'lib/fluent/plugin/in_tail.rb', line 841 def on_notify begin stat = Fluent::FileWrapper.stat(@path) rescue Errno::ENOENT, Errno::EACCES # moved or deleted stat = nil end @rotate_handler.on_notify(stat) if @rotate_handler @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher @io_handler.on_notify if @io_handler end |
#on_rotate(stat) ⇒ Object
854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 |
# File 'lib/fluent/plugin/in_tail.rb', line 854 def on_rotate(stat) if @io_handler.nil? if stat # first time fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case of a and b, seek to the saved position # c) file was once renamed, truncated and then backed # in this case, consider it truncated @pe.update(inode, 0) if fsize < @pe.read_pos elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. @pe.update(inode, 0) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end @io_handler = io_handler else @io_handler = NullIOHandler.new end else watcher_needs_update = false if stat inode = stat.ino if inode == @pe.read_inode # truncated @pe.update_pos(0) @io_handler.close elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher @pe.update(inode, 0) else # file is rotated and new file found watcher_needs_update = true # Handle the old log file before renewing TailWatcher [fluentd#1055] @io_handler.on_notify end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil watcher_needs_update = true end if watcher_needs_update if @follow_inodes # If stat is nil (file not present), NEED to stop and discard this watcher. # When the file is disappeared but is resurrected soon, then `#refresh_watcher` # can't recognize this TailWatcher needs to be stopped. # This can happens when the file is rotated. # If a notify comes before the new file for the path is created during rotation, # then it appears as if the file was resurrected once it disappeared. # Don't want to swap state because we need latest read offset in pos file even after rotate_wait @update_watcher.call(self, @pe, stat&.ino) else # Permit to handle if stat is nil (file not present). # If a file is mv-ed and a new file is created during # calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers` # and `#stop_watchers()` for the path because `target_paths_hash` # always contains the path. @update_watcher.call(self, swap_state(@pe), stat&.ino) end else @log.info "detected rotation of #{@path}" @io_handler = io_handler end @metrics.rotated.inc end end |
#register_watcher(watcher) ⇒ Object
818 819 820 |
# File 'lib/fluent/plugin/in_tail.rb', line 818 def register_watcher(watcher) @watchers << watcher end |
#swap_state(pe) ⇒ Object
939 940 941 942 943 944 945 |
# File 'lib/fluent/plugin/in_tail.rb', line 939 def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe pe # This pe will be updated in on_rotate after TailWatcher is initialized end |
#tag ⇒ Object
814 815 816 |
# File 'lib/fluent/plugin/in_tail.rb', line 814 def tag @parsed_tag ||= @path.tr('/', '.').squeeze('.').gsub(/^\./, '') end |