Class: Fluent::Plugin::ForwardOutput::FailureDetector
- Inherits:
-
Object
- Object
- Fluent::Plugin::ForwardOutput::FailureDetector
- Defined in:
- lib/fluent/plugin/out_forward/failure_detector.rb
Constant Summary collapse
- PHI_FACTOR =
1.0 / Math.log(10.0)
- SAMPLE_SIZE =
1000
Instance Method Summary collapse
- #add(now) ⇒ Object
- #clear ⇒ Object
- #hard_timeout?(now) ⇒ Boolean
-
#initialize(heartbeat_interval, hard_timeout, init_last) ⇒ FailureDetector
constructor
A new instance of FailureDetector.
- #phi(now) ⇒ Object
- #sample_size ⇒ Object
Constructor Details
#initialize(heartbeat_interval, hard_timeout, init_last) ⇒ FailureDetector
Returns a new instance of FailureDetector.
25 26 27 28 29 30 31 32 33 |
# File 'lib/fluent/plugin/out_forward/failure_detector.rb', line 25 def initialize(heartbeat_interval, hard_timeout, init_last) @heartbeat_interval = heartbeat_interval @last = init_last @hard_timeout = hard_timeout # microsec @init_gap = (heartbeat_interval * 1e6).to_i @window = [@init_gap] end |
Instance Method Details
#add(now) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/fluent/plugin/out_forward/failure_detector.rb', line 39 def add(now) if @window.empty? @window << @init_gap @last = now else gap = now - @last @window << (gap * 1e6).to_i @window.shift if @window.length > SAMPLE_SIZE @last = now end end |
#clear ⇒ Object
78 79 80 81 |
# File 'lib/fluent/plugin/out_forward/failure_detector.rb', line 78 def clear @window.clear @last = 0 end |
#hard_timeout?(now) ⇒ Boolean
35 36 37 |
# File 'lib/fluent/plugin/out_forward/failure_detector.rb', line 35 def hard_timeout?(now) now - @last > @hard_timeout end |
#phi(now) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/fluent/plugin/out_forward/failure_detector.rb', line 51 def phi(now) size = @window.size return 0.0 if size == 0 # Calculate weighted moving average mean_usec = 0 fact = 0 @window.each_with_index {|gap,i| mean_usec += gap * (1+i) fact += (1+i) } mean_usec = mean_usec / fact # Normalize arrive intervals into 1sec mean = (mean_usec.to_f / 1e6) - @heartbeat_interval + 1 # Calculate phi of the phi accrual failure detector t = now - @last - @heartbeat_interval + 1 phi = PHI_FACTOR * t / mean return phi end |
#sample_size ⇒ Object
74 75 76 |
# File 'lib/fluent/plugin/out_forward/failure_detector.rb', line 74 def sample_size @window.size end |