Class: Listen::Event::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/listen/event/processor.rb

Defined Under Namespace

Classes: Stopped

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, reasons) ⇒ Processor

Returns a new instance of Processor.



8
9
10
11
12
13
# File 'lib/listen/event/processor.rb', line 8

def initialize(config, reasons)
  @config = config
  @listener = config.listener
  @reasons = reasons
  _reset_no_unprocessed_events
end

Instance Attribute Details

#configObject (readonly, private)

Returns the value of attribute config.



125
126
127
# File 'lib/listen/event/processor.rb', line 125

def config
  @config
end

Instance Method Details

#_check_stoppedObject (private)



55
56
57
58
59
60
# File 'lib/listen/event/processor.rb', line 55

def _check_stopped
  if @listener.stopped?
    _flush_wakeup_reasons
    raise Stopped
  end
end

#_deadlineObject (private)



82
83
84
# File 'lib/listen/event/processor.rb', line 82

def _deadline
  @_remember_time_of_first_unprocessed_event + @latency
end

#_flush_wakeup_reasonsObject (private)

rubocop:enable Naming/MemoizedInstanceVariableName



96
97
98
99
100
101
# File 'lib/listen/event/processor.rb', line 96

def _flush_wakeup_reasons
  until @reasons.empty?
    reason = @reasons.pop
    yield reason if block_given?
  end
end

#_process_changes(event) ⇒ Object (private)

for easier testing without sleep loop



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/listen/event/processor.rb', line 104

def _process_changes(event)
  _reset_no_unprocessed_events

  changes = [event]
  changes << config.event_queue.pop until config.event_queue.empty?

  return unless config.callable?

  hash = config.optimize_changes(changes)
  result = [hash[:modified], hash[:added], hash[:removed]]
  return if result.all?(&:empty?)

  block_start = MonotonicTime.now
  exception_note = " (exception)"
  ::Listen::Thread.rescue_and_log('_process_changes') do
    config.call(*result)
    exception_note = nil
  end
  Listen.logger.debug "Callback#{exception_note} took #{MonotonicTime.now - block_start} sec"
end

#_remember_time_of_first_unprocessed_eventObject (private)



74
75
76
# File 'lib/listen/event/processor.rb', line 74

def _remember_time_of_first_unprocessed_event
  @_remember_time_of_first_unprocessed_event ||= MonotonicTime.now
end

#_reset_no_unprocessed_eventsObject (private)



78
79
80
# File 'lib/listen/event/processor.rb', line 78

def _reset_no_unprocessed_events
  @_remember_time_of_first_unprocessed_event = nil
end

#_sleep(seconds) ⇒ Object (private)



62
63
64
65
66
67
68
69
70
71
72
# File 'lib/listen/event/processor.rb', line 62

def _sleep(seconds)
  _check_stopped
  config.sleep(seconds)
  _check_stopped

  _flush_wakeup_reasons do |reason|
    if reason == :event && !@listener.paused?
      _remember_time_of_first_unprocessed_event
    end
  end
end

#_wait_until_eventsObject (private)

blocks until event is popped returns the event or nil when the event_queue is closed rubocop:disable Naming/MemoizedInstanceVariableName



89
90
91
92
93
# File 'lib/listen/event/processor.rb', line 89

def _wait_until_events
  config.event_queue.pop.tap do |_event|
    @_remember_time_of_first_unprocessed_event ||= MonotonicTime.now
  end
end

#_wait_until_events_calm_downObject (private)



36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/listen/event/processor.rb', line 36

def _wait_until_events_calm_down
  loop do
    now = MonotonicTime.now

    # Assure there's at least latency between callbacks to allow
    # for accumulating changes
    diff = _deadline - now
    break if diff <= 0

    # give events a bit of time to accumulate so they can be
    # compressed/optimized
    _sleep(diff)
  end
end

#_wait_until_no_longer_pausedObject (private)



51
52
53
# File 'lib/listen/event/processor.rb', line 51

def _wait_until_no_longer_paused
  @listener.wait_for_state(*(Listener.states.keys - [:paused]))
end

#loop_for(latency) ⇒ Object

TODO: implement this properly instead of checking the state at arbitrary points in time



17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/listen/event/processor.rb', line 17

def loop_for(latency)
  @latency = latency

  loop do
    event = _wait_until_events
    _check_stopped
    _wait_until_events_calm_down
    _wait_until_no_longer_paused
    _process_changes(event)
  end
rescue Stopped
  Listen.logger.debug('Processing stopped')
end