Class: DirectoryWatcher::Collector
- Inherits:
-
Object
- Object
- DirectoryWatcher::Collector
- Defined in:
- lib/directory_watcher/collector.rb
Overview
Collector reads items from a collection Queue and processes them to see if FileEvents should be put onto the notification Queue.
Instance Method Summary collapse
-
#collection_queue ⇒ Object
The queue from which to read items from the scanners.
-
#dump_stats(io) ⇒ Object
Write the current stats to the given IO object as a YAML document.
-
#initialize(config) ⇒ Collector
constructor
Create a new StatCollector from the given Configuration, and an optional Scan.
-
#load_stats(io) ⇒ Object
Read the current stats from the given IO object.
-
#notification_queue ⇒ Object
The queue to write Events for the Notifier.
-
#on_scan(scan, emit_events = true) ⇒ Object
Given the scan, update the set of stats with the results from the Scan and emit events to the notification queue as appropriate.
-
#on_stat(stat, emit_event = true) ⇒ Object
Process a single stat and emit an event if necessary.
-
#order_by ⇒ Object
How to order Scan results.
-
#run ⇒ Object
Remove one item from the collection queue and process it.
-
#sort_by ⇒ Object
How to sort Scan results.
-
#stable_threshold ⇒ Object
The number of times we see a file hasn’t changed before emitting a stable count.
Methods included from Logable
Methods included from Threaded
#_activity_thread, #continue_on_error=, #continue_on_error?, #finished_iterations?, #interval, #interval=, #iterations, #join, #maximum_iterations, #maximum_iterations=, #pause, #resume, #running?, #start, #status, #stop, #wait
Constructor Details
#initialize(config) ⇒ Collector
Create a new StatCollector from the given Configuration, and an optional Scan.
configuration - The Collector uses from Configuration:
collection_queue - The Queue to read items from the Scanner on
notification_queue - The Queue to submit the Events to the Notifier on
stable - The number of times we see a file hasn't changed before
emitting a stable event
sort_by - the method used to sort events during on_scan results
order_by - The method used to order events from call to on_scan
pre_load_scan - A Scan to use to load our internal state from before. No
events will be emitted for the FileStat's in this scan.
def initialize( notification_queue, collection_queue, options = {} )
23 24 25 26 27 28 29 |
# File 'lib/directory_watcher/collector.rb', line 23 def initialize( config ) @stats = Hash.new @stable_counts = Hash.new @config = config on_scan( DirectoryWatcher::Scan.new( config.glob ), false ) if config.pre_load? self.interval = 0.01 # yes this is a fast loop end |
Instance Method Details
#collection_queue ⇒ Object
The queue from which to read items from the scanners. See Configuration.
51 52 53 |
# File 'lib/directory_watcher/collector.rb', line 51 def collection_queue @config.collection_queue end |
#dump_stats(io) ⇒ Object
Write the current stats to the given IO object as a YAML document.
io - The IO object to write the document to.
Returns nothing.
122 123 124 |
# File 'lib/directory_watcher/collector.rb', line 122 def dump_stats( io ) YAML.dump(@stats, io) end |
#load_stats(io) ⇒ Object
Read the current stats from the given IO object. Any existing stats in the Collector will be overwritten
io - The IO object from which to read the document.
Returns nothing.
132 133 134 |
# File 'lib/directory_watcher/collector.rb', line 132 def load_stats( io ) @stats = YAML.load(io) end |
#notification_queue ⇒ Object
The queue to write Events for the Notifier. See Configuration.
57 58 59 |
# File 'lib/directory_watcher/collector.rb', line 57 def notification_queue @config.notification_queue end |
#on_scan(scan, emit_events = true) ⇒ Object
Given the scan, update the set of stats with the results from the Scan and emit events to the notification queue as appropriate.
scan - The Scan containing all the new FileStat items emit_events - Should events be emitted for the events in the scan
(default: true)
There is one odd thing that happens here. Scanners that are EventableScanners use on_stat to emit removed events, and the standard threaded Scanner only uses Scans. So we make sure and only emit removed events in this method if the scanner that gave us the scan was the basic threaded Scanner.
TODO: Possibly fix this through another abstraction in the Scanners. No idea about what that would be yet.
Returns nothing.
78 79 80 81 82 83 84 85 86 |
# File 'lib/directory_watcher/collector.rb', line 78 def on_scan( scan, emit_events = true ) seen_paths = Set.new logger.debug "Sorting by #{sort_by} #{order_by}" sorted_stats( scan.run ).each do |stat| on_stat(stat, emit_events) seen_paths << stat.path end emit_removed_events(seen_paths) if @config.scanner.nil? end |
#on_stat(stat, emit_event = true) ⇒ Object
Process a single stat and emit an event if necessary.
stat - The new FileStat to process and see if an event should
be emitted
emit_event - Whether or not an event should be emitted.
Returns nothing
95 96 97 98 99 |
# File 'lib/directory_watcher/collector.rb', line 95 def on_stat( stat, emit_event = true ) orig_stat = update_stat( stat ) logger.debug "Emitting event for on_stat #{stat}" emit_event_for( orig_stat, stat ) if emit_event end |
#order_by ⇒ Object
How to order Scan results. See Configuration.
45 46 47 |
# File 'lib/directory_watcher/collector.rb', line 45 def order_by @config.order_by end |
#run ⇒ Object
Remove one item from the collection queue and process it.
This method is required by the Threaded API
Returns nothing
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/directory_watcher/collector.rb', line 106 def run case thing = collection_queue.deq when ::DirectoryWatcher::Scan on_scan(thing) when ::DirectoryWatcher::FileStat on_stat(thing) else raise "Unknown item in the queue: #{thing}" end end |
#sort_by ⇒ Object
How to sort Scan results. See Configuration.
39 40 41 |
# File 'lib/directory_watcher/collector.rb', line 39 def sort_by @config.sort_by end |
#stable_threshold ⇒ Object
The number of times we see a file hasn’t changed before emitting a stable count. See Configuration#stable
33 34 35 |
# File 'lib/directory_watcher/collector.rb', line 33 def stable_threshold @config.stable end |