Class: FileProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/rforward/file_processor.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(filepath, line_processor) ⇒ FileProcessor

Returns a new instance of FileProcessor.



4
5
6
# File 'lib/rforward/file_processor.rb', line 4

def initialize filepath, line_processor
  @filepath, @line_processor = filepath, line_processor
end

Instance Attribute Details

#filepathObject

Returns the value of attribute filepath.



2
3
4
# File 'lib/rforward/file_processor.rb', line 2

def filepath
  @filepath
end

#line_processorObject

Returns the value of attribute line_processor.



2
3
4
# File 'lib/rforward/file_processor.rb', line 2

def line_processor
  @line_processor
end

Class Method Details

.call(filepath) ⇒ Object



28
29
30
31
# File 'lib/rforward/file_processor.rb', line 28

def self.call filepath
  processor = FileProcessor.new filepath, FFluentdLine.new
  processor.call
end

Instance Method Details

#callObject



8
9
10
11
12
13
14
15
16
17
# File 'lib/rforward/file_processor.rb', line 8

def call
  RLogger.instance.info "start working on #{filepath}"
  File.readlines(filepath).each do |line|
    line_processor.call line
  end
  Stat.instance.files_current += 1
  RLogger.instance.info "finish working on #{filepath}"
  RLogger.instance.stat
  delay
end

#delayObject



20
21
22
23
24
25
26
# File 'lib/rforward/file_processor.rb', line 20

def delay
  if Config.instance['flush_threshold'].to_i < Stat.instance.flush_counter
    RLogger.instance.info "Sleep for #{Config.instance['flush_delay']} seconds"
    sleep(Config.instance['flush_delay'].to_i)
    Stat.instance.flush_counter = 0
  end
end