Class: Qwirk::Batch::AcquireFileStrategy
- Inherits:
-
Object
- Object
- Qwirk::Batch::AcquireFileStrategy
- Defined in:
- lib/qwirk/batch/acquire_file_strategy.rb
Instance Method Summary collapse
- #complete_file(file) ⇒ Object
-
#initialize(options) ⇒ AcquireFileStrategy
constructor
A new instance of AcquireFileStrategy.
- #mark_file_as_processing(file) ⇒ Object
-
#next_file ⇒ Object
Returns the next file or nil if stopped.
- #stop ⇒ Object
Constructor Details
#initialize(options) ⇒ AcquireFileStrategy
Returns a new instance of AcquireFileStrategy.
4 5 6 7 8 9 10 11 |
# File 'lib/qwirk/batch/acquire_file_strategy.rb', line 4 def initialize() @glob = [:glob] raise "file_options glob value not set" unless @glob @poll_time = ([:poll_time] || 10.0).to_f # Ftp's could be in progress, make sure the file is at least 60 seconds old by default before processing @age = ([:age] || 60).to_i @stopped = false end |
Instance Method Details
#complete_file(file) ⇒ Object
33 34 35 36 37 38 |
# File 'lib/qwirk/batch/acquire_file_strategy.rb', line 33 def complete_file(file) file.match(/(.*)\.processing$/) || raise("#{file} is not currently being processed") new_file = $1 + '.completed' File.rename(file, new_file) return new_file end |
#mark_file_as_processing(file) ⇒ Object
27 28 29 30 31 |
# File 'lib/qwirk/batch/acquire_file_strategy.rb', line 27 def mark_file_as_processing(file) new_file = file + '.processing' File.rename(file, new_file) return new_file end |
#next_file ⇒ Object
Returns the next file or nil if stopped
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/qwirk/batch/acquire_file_strategy.rb', line 14 def next_file until @stopped Dir.glob(@glob).each do |file| unless file.match /\.(processing|completed)$/ return file if (Time.now - File.mtime(file) > @age) end end @sleep_thread = Thread.current sleep @poll_time end return nil end |
#stop ⇒ Object
40 41 42 43 |
# File 'lib/qwirk/batch/acquire_file_strategy.rb', line 40 def stop @stopped = true @sleep_thread.wakeup if @sleep_thread end |