Class: FileWorker::DirectoryScanner

Inherits:
Object
  • Object
show all
Defined in:
lib/file_worker/directory_scanner.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ DirectoryScanner

Returns a new instance of DirectoryScanner.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/file_worker/directory_scanner.rb', line 10

def initialize(options)
  @options   = options
  @in_path   = Pathname.new(@options[:in_directory])
  @done_path = Pathname.new(@options[:out_directory])
  @glob_path = @in_path + '**/*'
  @sleep     = @options[:sleep] || 1
  @workers   = @options[:workers] || 5

  @max_queue_size = @options[:max_queue_size] || 1000

  @worker_class = DefaultWorker

  @state = []
  @state_lock = Mutex.new

  @queue_name = "file_worker"

  @error_handlers = []
end

Instance Attribute Details

#done_pathObject (readonly)

Returns the value of attribute done_path.



8
9
10
# File 'lib/file_worker/directory_scanner.rb', line 8

def done_path
  @done_path
end

#in_pathObject (readonly)

Returns the value of attribute in_path.



8
9
10
# File 'lib/file_worker/directory_scanner.rb', line 8

def in_path
  @in_path
end

#stateObject (readonly)

Returns the value of attribute state.



8
9
10
# File 'lib/file_worker/directory_scanner.rb', line 8

def state
  @state
end

#worker_classObject

Returns the value of attribute worker_class.



7
8
9
# File 'lib/file_worker/directory_scanner.rb', line 7

def worker_class
  @worker_class
end

Instance Method Details

#enqueue(file_name) ⇒ Object



57
58
59
60
61
62
63
# File 'lib/file_worker/directory_scanner.rb', line 57

def enqueue(file_name)
  @state_lock.synchronize do
    @state << file_name
  end

  queue.push(file_name)
end

#handle_error(file_name, exception) ⇒ Object



118
119
120
121
122
# File 'lib/file_worker/directory_scanner.rb', line 118

def handle_error(file_name, exception)
  @error_handlers.each do |error_handler|
    error_handler.call(file_name, exception)
  end
end

#on_error(&block) ⇒ Object



114
115
116
# File 'lib/file_worker/directory_scanner.rb', line 114

def on_error(&block)
  @error_handlers << block
end

#process(file_name) ⇒ Object

This method is called by the worker threads, so remember to keep it thread safe



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/file_worker/directory_scanner.rb', line 31

def process(file_name)
  begin
    @worker_class.new(file_name, @options).process

    done_file_name = file_name.sub(@in_path, @done_path)
    FileUtils.mkdir_p(File.dirname(done_file_name))
    FileUtils.mv(file_name, done_file_name, :secure => true)
  rescue Exception => e
    handle_error(file_name, e)
  end

  @state_lock.synchronize do
    @state.delete(file_name)
  end
end

#queueObject



47
48
49
50
51
# File 'lib/file_worker/directory_scanner.rb', line 47

def queue
  @queue ||= GirlFriday::WorkQueue.new(@queue_name, :size => @workers) do |file_name|
    process(file_name)
  end
end

#queue_sizeObject



53
54
55
# File 'lib/file_worker/directory_scanner.rb', line 53

def queue_size
  queue.status[@queue_name][:backlog]
end

#scanObject



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/file_worker/directory_scanner.rb', line 75

def scan
  file_names = nil

  @state_lock.synchronize do
    file_names = Dir.glob(@glob_path) - @state
  end

  file_names.reject! { |file_name| File.directory?(file_name) }

  max_items = @max_queue_size - queue_size

  if max_items > 0
    file_names[0,max_items].each do |file_name|
      enqueue(file_name)
    end
  end
end

#startObject



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/file_worker/directory_scanner.rb', line 93

def start
  puts "File Worker Starting"
  puts "  in directory    #{@in_path}"
  puts "  out directory   #{@done_path}"
  puts "  workers         #{@workers}"
  puts "  max queue size  #{@max_queue_size}"
  puts "  scan interval   #{@sleep}s"
  puts "  worker          #{@worker_class.name}"
  puts ""

  @run = true
  while @run
    scan
    sleep(@sleep)
  end
end

#stopObject



110
111
112
# File 'lib/file_worker/directory_scanner.rb', line 110

def stop
  @run = false
end

#wait_for_emptyObject



65
66
67
68
69
70
71
72
73
# File 'lib/file_worker/directory_scanner.rb', line 65

def wait_for_empty
  queue.wait_for_empty

  sleep 0.5

  while queue.status[@queue_name][:busy] != 0
    sleep 0.5
  end
end