Class: LogStash::ShutdownWatcher
- Inherits:
-
Object
- Object
- LogStash::ShutdownWatcher
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/shutdown_watcher.rb
Constant Summary collapse
- CHECK_EVERY =
second
1
- REPORT_EVERY =
checks
5
- ABORT_AFTER =
stalled reports
3
Instance Attribute Summary collapse
-
#abort_threshold ⇒ Object
readonly
Returns the value of attribute abort_threshold.
-
#cycle_period ⇒ Object
readonly
Returns the value of attribute cycle_period.
-
#report_every ⇒ Object
readonly
Returns the value of attribute report_every.
Class Method Summary collapse
- .start(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ Object
- .unsafe_shutdown=(boolean) ⇒ Object
- .unsafe_shutdown? ⇒ Boolean
Instance Method Summary collapse
- #attempts_count ⇒ Object
- #force_exit ⇒ Object
-
#initialize(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ ShutdownWatcher
constructor
A new instance of ShutdownWatcher.
- #logger ⇒ Object
- #pipeline_report_snapshot ⇒ Object
-
#shutdown_stalled? ⇒ Boolean
A pipeline shutdown is stalled if * at least REPORT_EVERY reports have been created * the inflight event count is in monotonically increasing * there are worker threads running which aren’t blocked on SizedQueue pop/push * the stalled thread list is constant in the previous REPORT_EVERY reports.
- #start ⇒ Object
- #stop! ⇒ Object
- #stopped? ⇒ Boolean
Methods included from Util::Loggable
Constructor Details
#initialize(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ ShutdownWatcher
Returns a new instance of ShutdownWatcher.
15 16 17 18 19 20 21 22 23 |
# File 'lib/logstash/shutdown_watcher.rb', line 15 def initialize(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER) @pipeline = pipeline @cycle_period = cycle_period @report_every = report_every @abort_threshold = abort_threshold @reports = [] @attempts_count = Concurrent::AtomicFixnum.new(0) @running = Concurrent::AtomicBoolean.new(false) end |
Instance Attribute Details
#abort_threshold ⇒ Object (readonly)
Returns the value of attribute abort_threshold.
13 14 15 |
# File 'lib/logstash/shutdown_watcher.rb', line 13 def abort_threshold @abort_threshold end |
#cycle_period ⇒ Object (readonly)
Returns the value of attribute cycle_period.
13 14 15 |
# File 'lib/logstash/shutdown_watcher.rb', line 13 def cycle_period @cycle_period end |
#report_every ⇒ Object (readonly)
Returns the value of attribute report_every.
13 14 15 |
# File 'lib/logstash/shutdown_watcher.rb', line 13 def report_every @report_every end |
Class Method Details
.start(pipeline, cycle_period = CHECK_EVERY, report_every = REPORT_EVERY, abort_threshold = ABORT_AFTER) ⇒ Object
33 34 35 36 |
# File 'lib/logstash/shutdown_watcher.rb', line 33 def self.start(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER) controller = self.new(pipeline, cycle_period, report_every, abort_threshold) Thread.new(controller) { |controller| controller.start } end |
.unsafe_shutdown=(boolean) ⇒ Object
25 26 27 |
# File 'lib/logstash/shutdown_watcher.rb', line 25 def self.unsafe_shutdown=(boolean) @unsafe_shutdown = boolean end |
.unsafe_shutdown? ⇒ Boolean
29 30 31 |
# File 'lib/logstash/shutdown_watcher.rb', line 29 def self.unsafe_shutdown? @unsafe_shutdown end |
Instance Method Details
#attempts_count ⇒ Object
42 43 44 |
# File 'lib/logstash/shutdown_watcher.rb', line 42 def attempts_count @attempts_count.value end |
#force_exit ⇒ Object
111 112 113 |
# File 'lib/logstash/shutdown_watcher.rb', line 111 def force_exit exit(-1) end |
#logger ⇒ Object
38 39 40 |
# File 'lib/logstash/shutdown_watcher.rb', line 38 def logger self.class.logger end |
#pipeline_report_snapshot ⇒ Object
87 88 89 |
# File 'lib/logstash/shutdown_watcher.rb', line 87 def pipeline_report_snapshot @pipeline.reporter.snapshot end |
#shutdown_stalled? ⇒ Boolean
A pipeline shutdown is stalled if
-
at least REPORT_EVERY reports have been created
-
the inflight event count is in monotonically increasing
-
there are worker threads running which aren’t blocked on SizedQueue pop/push
-
the stalled thread list is constant in the previous REPORT_EVERY reports
96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/logstash/shutdown_watcher.rb', line 96 def shutdown_stalled? return false unless @reports.size == @report_every # # is stalled if inflight count is either constant or increasing stalled_event_count = @reports.each_cons(2).all? do |prev_report, next_report| prev_report.inflight_count <= next_report.inflight_count end if stalled_event_count @reports.each_cons(2).all? do |prev_report, next_report| prev_report.stalling_threads == next_report.stalling_threads end else false end end |
#start ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/logstash/shutdown_watcher.rb', line 54 def start sleep(@cycle_period) cycle_number = 0 stalled_count = 0 running! Stud.interval(@cycle_period) do @attempts_count.increment break if stopped? break unless @pipeline.thread.alive? @reports << pipeline_report_snapshot @reports.delete_at(0) if @reports.size > @report_every # expire old report if cycle_number == (@report_every - 1) # it's report time! logger.warn(@reports.last.to_s) if shutdown_stalled? logger.error("The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.") if stalled_count == 0 stalled_count += 1 if self.class.unsafe_shutdown? && @abort_threshold == stalled_count logger.fatal("Forcefully quitting logstash..") force_exit() break end else stalled_count = 0 end end cycle_number = (cycle_number + 1) % @report_every end ensure stop! end |
#stop! ⇒ Object
46 47 48 |
# File 'lib/logstash/shutdown_watcher.rb', line 46 def stop! @running.make_false end |
#stopped? ⇒ Boolean
50 51 52 |
# File 'lib/logstash/shutdown_watcher.rb', line 50 def stopped? @running.false? end |