Class: LogStash::ShutdownWatcher
- Inherits:
-
Object
- Object
- LogStash::ShutdownWatcher
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/shutdown_watcher.rb
Constant Summary collapse
- CHECK_EVERY =
second
1
- REPORT_EVERY =
checks
5
- ABORT_AFTER =
stalled reports
3
Instance Attribute Summary collapse
-
#abort_threshold ⇒ Object
readonly
Returns the value of attribute abort_threshold.
-
#cycle_period ⇒ Object
readonly
Returns the value of attribute cycle_period.
-
#report_every ⇒ Object
readonly
Returns the value of attribute report_every.
Class Method Summary collapse
- .start(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ Object
- .unsafe_shutdown=(boolean) ⇒ Object
- .unsafe_shutdown? ⇒ Boolean
Instance Method Summary collapse
- #force_exit ⇒ Object
-
#initialize(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ ShutdownWatcher
constructor
A new instance of ShutdownWatcher.
- #logger ⇒ Object
- #pipeline_report_snapshot ⇒ Object
-
#shutdown_stalled? ⇒ Boolean
A pipeline shutdown is stalled if * at least REPORT_EVERY reports have been created * the inflight event count is in monotonically increasing * there are worker threads running which aren’t blocked on SizedQueue pop/push * the stalled thread list is constant in the previous REPORT_EVERY reports.
- #start ⇒ Object
Methods included from Util::Loggable
Constructor Details
#initialize(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ ShutdownWatcher
Returns a new instance of ShutdownWatcher.
13 14 15 16 17 18 19 |
# File 'lib/logstash/shutdown_watcher.rb', line 13 def initialize(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER) @pipeline = pipeline @cycle_period = cycle_period @report_every = report_every @abort_threshold = abort_threshold @reports = [] end |
Instance Attribute Details
#abort_threshold ⇒ Object (readonly)
Returns the value of attribute abort_threshold.
11 12 13 |
# File 'lib/logstash/shutdown_watcher.rb', line 11 def abort_threshold @abort_threshold end |
#cycle_period ⇒ Object (readonly)
Returns the value of attribute cycle_period.
11 12 13 |
# File 'lib/logstash/shutdown_watcher.rb', line 11 def cycle_period @cycle_period end |
#report_every ⇒ Object (readonly)
Returns the value of attribute report_every.
11 12 13 |
# File 'lib/logstash/shutdown_watcher.rb', line 11 def report_every @report_every end |
Class Method Details
.start(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ Object
29 30 31 32 |
# File 'lib/logstash/shutdown_watcher.rb', line 29 def self.start(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER) controller = self.new(pipeline, cycle_period, report_every, abort_threshold) Thread.new(controller) { |controller| controller.start } end |
.unsafe_shutdown=(boolean) ⇒ Object
21 22 23 |
# File 'lib/logstash/shutdown_watcher.rb', line 21 def self.unsafe_shutdown=(boolean) @unsafe_shutdown = boolean end |
.unsafe_shutdown? ⇒ Boolean
25 26 27 |
# File 'lib/logstash/shutdown_watcher.rb', line 25 def self.unsafe_shutdown? @unsafe_shutdown end |
Instance Method Details
#force_exit ⇒ Object
90 91 92 |
# File 'lib/logstash/shutdown_watcher.rb', line 90 def force_exit exit(-1) end |
#logger ⇒ Object
34 35 36 |
# File 'lib/logstash/shutdown_watcher.rb', line 34 def logger self.class.logger end |
#pipeline_report_snapshot ⇒ Object
66 67 68 |
# File 'lib/logstash/shutdown_watcher.rb', line 66 def pipeline_report_snapshot @pipeline.reporter.snapshot end |
#shutdown_stalled? ⇒ Boolean
A pipeline shutdown is stalled if
-
at least REPORT_EVERY reports have been created
-
the inflight event count is in monotonically increasing
-
there are worker threads running which aren’t blocked on SizedQueue pop/push
-
the stalled thread list is constant in the previous REPORT_EVERY reports
75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/logstash/shutdown_watcher.rb', line 75 def shutdown_stalled? return false unless @reports.size == @report_every # # is stalled if inflight count is either constant or increasing stalled_event_count = @reports.each_cons(2).all? do |prev_report, next_report| prev_report.inflight_count <= next_report.inflight_count end if stalled_event_count @reports.each_cons(2).all? do |prev_report, next_report| prev_report.stalling_threads == next_report.stalling_threads end else false end end |
#start ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/logstash/shutdown_watcher.rb', line 38 def start sleep(@cycle_period) cycle_number = 0 stalled_count = 0 Stud.interval(@cycle_period) do break unless @pipeline.thread.alive? @reports << pipeline_report_snapshot @reports.delete_at(0) if @reports.size > @report_every # expire old report if cycle_number == (@report_every - 1) # it's report time! logger.warn(@reports.last.to_s) if shutdown_stalled? logger.error("The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.") if stalled_count == 0 stalled_count += 1 if self.class.unsafe_shutdown? && @abort_threshold == stalled_count logger.fatal("Forcefully quitting logstash..") force_exit() break end else stalled_count = 0 end end cycle_number = (cycle_number + 1) % @report_every end end |