Class: Listen::Event::Processor
- Inherits:
-
Object
- Object
- Listen::Event::Processor
- Defined in:
- lib/listen/event/processor.rb
Defined Under Namespace
Classes: Stopped
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
private
Returns the value of attribute config.
Instance Method Summary collapse
- #_check_stopped ⇒ Object private
- #_deadline ⇒ Object private
-
#_flush_wakeup_reasons ⇒ Object
private
rubocop:enable Naming/MemoizedInstanceVariableName.
-
#_process_changes(event) ⇒ Object
private
for easier testing without sleep loop.
- #_remember_time_of_first_unprocessed_event ⇒ Object private
- #_reset_no_unprocessed_events ⇒ Object private
- #_sleep(seconds) ⇒ Object private
-
#_wait_until_events ⇒ Object
private
blocks until event is popped returns the event or
nil
when the event_queue is closed rubocop:disable Naming/MemoizedInstanceVariableName. - #_wait_until_events_calm_down ⇒ Object private
- #_wait_until_no_longer_paused ⇒ Object private
-
#initialize(config, reasons) ⇒ Processor
constructor
A new instance of Processor.
-
#loop_for(latency) ⇒ Object
TODO: implement this properly instead of checking the state at arbitrary points in time.
Constructor Details
#initialize(config, reasons) ⇒ Processor
Returns a new instance of Processor.
8 9 10 11 12 13 |
# File 'lib/listen/event/processor.rb', line 8 def initialize(config, reasons) @config = config @listener = config.listener @reasons = reasons _reset_no_unprocessed_events end |
Instance Attribute Details
#config ⇒ Object (readonly, private)
Returns the value of attribute config.
125 126 127 |
# File 'lib/listen/event/processor.rb', line 125 def config @config end |
Instance Method Details
#_check_stopped ⇒ Object (private)
55 56 57 58 59 60 |
# File 'lib/listen/event/processor.rb', line 55 def _check_stopped if @listener.stopped? _flush_wakeup_reasons raise Stopped end end |
#_deadline ⇒ Object (private)
82 83 84 |
# File 'lib/listen/event/processor.rb', line 82 def _deadline @_remember_time_of_first_unprocessed_event + @latency end |
#_flush_wakeup_reasons ⇒ Object (private)
rubocop:enable Naming/MemoizedInstanceVariableName
96 97 98 99 100 101 |
# File 'lib/listen/event/processor.rb', line 96 def _flush_wakeup_reasons until @reasons.empty? reason = @reasons.pop yield reason if block_given? end end |
#_process_changes(event) ⇒ Object (private)
for easier testing without sleep loop
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/listen/event/processor.rb', line 104 def _process_changes(event) _reset_no_unprocessed_events changes = [event] changes << config.event_queue.pop until config.event_queue.empty? return unless config.callable? hash = config.optimize_changes(changes) result = [hash[:modified], hash[:added], hash[:removed]] return if result.all?(&:empty?) block_start = MonotonicTime.now exception_note = " (exception)" ::Listen::Thread.rescue_and_log('_process_changes') do config.call(*result) exception_note = nil end Listen.logger.debug "Callback#{exception_note} took #{MonotonicTime.now - block_start} sec" end |
#_remember_time_of_first_unprocessed_event ⇒ Object (private)
74 75 76 |
# File 'lib/listen/event/processor.rb', line 74 def _remember_time_of_first_unprocessed_event @_remember_time_of_first_unprocessed_event ||= MonotonicTime.now end |
#_reset_no_unprocessed_events ⇒ Object (private)
78 79 80 |
# File 'lib/listen/event/processor.rb', line 78 def _reset_no_unprocessed_events @_remember_time_of_first_unprocessed_event = nil end |
#_sleep(seconds) ⇒ Object (private)
62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/listen/event/processor.rb', line 62 def _sleep(seconds) _check_stopped config.sleep(seconds) _check_stopped _flush_wakeup_reasons do |reason| if reason == :event && !@listener.paused? _remember_time_of_first_unprocessed_event end end end |
#_wait_until_events ⇒ Object (private)
blocks until event is popped
returns the event or nil
when the event_queue is closed
rubocop:disable Naming/MemoizedInstanceVariableName
89 90 91 92 93 |
# File 'lib/listen/event/processor.rb', line 89 def _wait_until_events config.event_queue.pop.tap do |_event| @_remember_time_of_first_unprocessed_event ||= MonotonicTime.now end end |
#_wait_until_events_calm_down ⇒ Object (private)
36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/listen/event/processor.rb', line 36 def _wait_until_events_calm_down loop do now = MonotonicTime.now # Assure there's at least latency between callbacks to allow # for accumulating changes diff = _deadline - now break if diff <= 0 # give events a bit of time to accumulate so they can be # compressed/optimized _sleep(diff) end end |
#_wait_until_no_longer_paused ⇒ Object (private)
51 52 53 |
# File 'lib/listen/event/processor.rb', line 51 def _wait_until_no_longer_paused @listener.wait_for_state(*(Listener.states.keys - [:paused])) end |
#loop_for(latency) ⇒ Object
TODO: implement this properly instead of checking the state at arbitrary points in time
17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/listen/event/processor.rb', line 17 def loop_for(latency) @latency = latency loop do event = _wait_until_events _check_stopped _wait_until_events_calm_down _wait_until_no_longer_paused _process_changes(event) end rescue Stopped Listen.logger.debug('Processing stopped') end |