Class: LogStash::ShutdownWatcher

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/shutdown_watcher.rb

Constant Summary collapse

CHECK_EVERY =

second

1
REPORT_EVERY =

checks

5
ABORT_AFTER =

stalled reports

3

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util::Loggable

included, #slow_logger

Constructor Details

#initialize(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ ShutdownWatcher

Returns a new instance of ShutdownWatcher.



15
16
17
18
19
20
21
22
23
# File 'lib/logstash/shutdown_watcher.rb', line 15

def initialize(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER)
  @pipeline = pipeline
  @cycle_period = cycle_period
  @report_every = report_every
  @abort_threshold = abort_threshold
  @reports = []
  @attempts_count = Concurrent::AtomicFixnum.new(0)
  @running = Concurrent::AtomicBoolean.new(false)
end

Instance Attribute Details

#abort_thresholdObject (readonly)

Returns the value of attribute abort_threshold.



13
14
15
# File 'lib/logstash/shutdown_watcher.rb', line 13

def abort_threshold
  @abort_threshold
end

#cycle_periodObject (readonly)

Returns the value of attribute cycle_period.



13
14
15
# File 'lib/logstash/shutdown_watcher.rb', line 13

def cycle_period
  @cycle_period
end

#report_everyObject (readonly)

Returns the value of attribute report_every.



13
14
15
# File 'lib/logstash/shutdown_watcher.rb', line 13

def report_every
  @report_every
end

Class Method Details

.start(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ Object



33
34
35
36
# File 'lib/logstash/shutdown_watcher.rb', line 33

def self.start(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER)
  controller = self.new(pipeline, cycle_period, report_every, abort_threshold)
  Thread.new(controller) { |controller| controller.start }
end

.unsafe_shutdown=(boolean) ⇒ Object



25
26
27
# File 'lib/logstash/shutdown_watcher.rb', line 25

def self.unsafe_shutdown=(boolean)
  @unsafe_shutdown = boolean
end

.unsafe_shutdown?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/logstash/shutdown_watcher.rb', line 29

def self.unsafe_shutdown?
  @unsafe_shutdown
end

Instance Method Details

#attempts_countObject



42
43
44
# File 'lib/logstash/shutdown_watcher.rb', line 42

def attempts_count
  @attempts_count.value
end

#force_exitObject



111
112
113
# File 'lib/logstash/shutdown_watcher.rb', line 111

def force_exit
  exit(-1)
end

#loggerObject



38
39
40
# File 'lib/logstash/shutdown_watcher.rb', line 38

def logger
  self.class.logger
end

#pipeline_report_snapshotObject



87
88
89
# File 'lib/logstash/shutdown_watcher.rb', line 87

def pipeline_report_snapshot
  @pipeline.reporter.snapshot
end

#shutdown_stalled?Boolean

A pipeline shutdown is stalled if

  • at least REPORT_EVERY reports have been created

  • the inflight event count is in monotonically increasing

  • there are worker threads running which aren’t blocked on SizedQueue pop/push

  • the stalled thread list is constant in the previous REPORT_EVERY reports

Returns:

  • (Boolean)


96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/logstash/shutdown_watcher.rb', line 96

def shutdown_stalled?
  return false unless @reports.size == @report_every #
  # is stalled if inflight count is either constant or increasing
  stalled_event_count = @reports.each_cons(2).all? do |prev_report, next_report|
    prev_report.inflight_count <= next_report.inflight_count
  end
  if stalled_event_count
    @reports.each_cons(2).all? do |prev_report, next_report|
      prev_report.stalling_threads == next_report.stalling_threads
    end
  else
    false
  end
end

#startObject



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/logstash/shutdown_watcher.rb', line 54

def start
  sleep(@cycle_period)
  cycle_number = 0
  stalled_count = 0
  running!
  Stud.interval(@cycle_period) do
    @attempts_count.increment
    break if stopped?
    break unless @pipeline.thread.alive?
    @reports << pipeline_report_snapshot
    @reports.delete_at(0) if @reports.size > @report_every # expire old report
    if cycle_number == (@report_every - 1) # it's report time!
      logger.warn(@reports.last.to_s)

      if shutdown_stalled?
        logger.error("The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.") if stalled_count == 0
        stalled_count += 1

        if self.class.unsafe_shutdown? && @abort_threshold == stalled_count
          logger.fatal("Forcefully quitting logstash..")
          force_exit()
          break
        end
      else
        stalled_count = 0
      end
    end
    cycle_number = (cycle_number + 1) % @report_every
  end
ensure
  stop!
end

#stop!Object



46
47
48
# File 'lib/logstash/shutdown_watcher.rb', line 46

def stop!
  @running.make_false
end

#stopped?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/logstash/shutdown_watcher.rb', line 50

def stopped?
  @running.false?
end