Class: DirectoryWatcher::Collector

Inherits:
Object
  • Object
show all
Includes:
Logable, Threaded
Defined in:
lib/directory_watcher/collector.rb

Overview

Collector reads items from a collection Queue and processes them to see if FileEvents should be put onto the notification Queue.

Instance Method Summary collapse

Methods included from Logable

default_logger, #logger

Methods included from Threaded

#_activity_thread, #continue_on_error=, #continue_on_error?, #finished_iterations?, #interval, #interval=, #iterations, #join, #maximum_iterations, #maximum_iterations=, #pause, #resume, #running?, #start, #status, #stop, #wait

Constructor Details

#initialize(config) ⇒ Collector

Create a new StatCollector from the given Configuration, and an optional Scan.

configuration - The Collector uses from Configuration:

collection_queue   - The Queue to read items from the Scanner on
notification_queue - The Queue to submit the Events to the Notifier on
stable             - The number of times we see a file hasn't changed before
                     emitting a stable event
sort_by            - the method used to sort events during on_scan results
order_by           - The method used to order events from call to on_scan

pre_load_scan - A Scan to use to load our internal state from before. No

events will be emitted for the FileStat's in this scan.

def initialize( notification_queue, collection_queue, options = {} )



23
24
25
26
27
28
29
# File 'lib/directory_watcher/collector.rb', line 23

def initialize( config )
  @stats = Hash.new
  @stable_counts = Hash.new
  @config = config
  on_scan( DirectoryWatcher::Scan.new( config.glob ), false ) if config.pre_load?
  self.interval = 0.01 # yes this is a fast loop
end

Instance Method Details

#collection_queueObject

The queue from which to read items from the scanners. See Configuration.



51
52
53
# File 'lib/directory_watcher/collector.rb', line 51

def collection_queue
  @config.collection_queue
end

#dump_stats(io) ⇒ Object

Write the current stats to the given IO object as a YAML document.

io - The IO object to write the document to.

Returns nothing.



122
123
124
# File 'lib/directory_watcher/collector.rb', line 122

def dump_stats( io )
  YAML.dump(@stats, io)
end

#load_stats(io) ⇒ Object

Read the current stats from the given IO object. Any existing stats in the Collector will be overwritten

io - The IO object from which to read the document.

Returns nothing.



132
133
134
# File 'lib/directory_watcher/collector.rb', line 132

def load_stats( io )
  @stats = YAML.load(io)
end

#notification_queueObject

The queue to write Events for the Notifier. See Configuration.



57
58
59
# File 'lib/directory_watcher/collector.rb', line 57

def notification_queue
  @config.notification_queue
end

#on_scan(scan, emit_events = true) ⇒ Object

Given the scan, update the set of stats with the results from the Scan and emit events to the notification queue as appropriate.

scan - The Scan containing all the new FileStat items emit_events - Should events be emitted for the events in the scan

(default: true)

There is one odd thing that happens here. Scanners that are EventableScanners use on_stat to emit removed events, and the standard threaded Scanner only uses Scans. So we make sure and only emit removed events in this method if the scanner that gave us the scan was the basic threaded Scanner.

TODO: Possibly fix this through another abstraction in the Scanners. No idea about what that would be yet.

Returns nothing.



78
79
80
81
82
83
84
85
86
# File 'lib/directory_watcher/collector.rb', line 78

def on_scan( scan, emit_events = true )
  seen_paths = Set.new
  logger.debug "Sorting by #{sort_by} #{order_by}"
  sorted_stats( scan.run ).each do |stat|
    on_stat(stat, emit_events)
    seen_paths << stat.path
  end
  emit_removed_events(seen_paths) if @config.scanner.nil?
end

#on_stat(stat, emit_event = true) ⇒ Object

Process a single stat and emit an event if necessary.

stat - The new FileStat to process and see if an event should

be emitted

emit_event - Whether or not an event should be emitted.

Returns nothing



95
96
97
98
99
# File 'lib/directory_watcher/collector.rb', line 95

def on_stat( stat, emit_event = true )
  orig_stat = update_stat( stat )
  logger.debug "Emitting event for on_stat #{stat}"
  emit_event_for( orig_stat, stat ) if emit_event
end

#order_byObject

How to order Scan results. See Configuration.



45
46
47
# File 'lib/directory_watcher/collector.rb', line 45

def order_by
  @config.order_by
end

#runObject

Remove one item from the collection queue and process it.

This method is required by the Threaded API

Returns nothing



106
107
108
109
110
111
112
113
114
115
# File 'lib/directory_watcher/collector.rb', line 106

def run
  case thing = collection_queue.deq
  when ::DirectoryWatcher::Scan
    on_scan(thing)
  when ::DirectoryWatcher::FileStat
    on_stat(thing)
  else
    raise "Unknown item in the queue: #{thing}"
  end
end

#sort_byObject

How to sort Scan results. See Configuration.



39
40
41
# File 'lib/directory_watcher/collector.rb', line 39

def sort_by
  @config.sort_by
end

#stable_thresholdObject

The number of times we see a file hasn’t changed before emitting a stable count. See Configuration#stable



33
34
35
# File 'lib/directory_watcher/collector.rb', line 33

def stable_threshold
  @config.stable
end