Class: Fluent::Plugin::ForwardOutput::FailureDetector

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_forward/failure_detector.rb

Constant Summary collapse

PHI_FACTOR =
1.0 / Math.log(10.0)
SAMPLE_SIZE =
1000

Instance Method Summary collapse

Constructor Details

#initialize(heartbeat_interval, hard_timeout, init_last) ⇒ FailureDetector

Returns a new instance of FailureDetector.



25
26
27
28
29
30
31
32
33
# File 'lib/fluent/plugin/out_forward/failure_detector.rb', line 25

def initialize(heartbeat_interval, hard_timeout, init_last)
  @heartbeat_interval = heartbeat_interval
  @last = init_last
  @hard_timeout = hard_timeout

  # microsec
  @init_gap = (heartbeat_interval * 1e6).to_i
  @window = [@init_gap]
end

Instance Method Details

#add(now) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
# File 'lib/fluent/plugin/out_forward/failure_detector.rb', line 39

def add(now)
  if @window.empty?
    @window << @init_gap
    @last = now
  else
    gap = now - @last
    @window << (gap * 1e6).to_i
    @window.shift if @window.length > SAMPLE_SIZE
    @last = now
  end
end

#clearObject



78
79
80
81
# File 'lib/fluent/plugin/out_forward/failure_detector.rb', line 78

def clear
  @window.clear
  @last = 0
end

#hard_timeout?(now) ⇒ Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/fluent/plugin/out_forward/failure_detector.rb', line 35

def hard_timeout?(now)
  now - @last > @hard_timeout
end

#phi(now) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/fluent/plugin/out_forward/failure_detector.rb', line 51

def phi(now)
  size = @window.size
  return 0.0 if size == 0

  # Calculate weighted moving average
  mean_usec = 0
  fact = 0
  @window.each_with_index {|gap,i|
    mean_usec += gap * (1+i)
    fact += (1+i)
  }
  mean_usec = mean_usec / fact

  # Normalize arrive intervals into 1sec
  mean = (mean_usec.to_f / 1e6) - @heartbeat_interval + 1

  # Calculate phi of the phi accrual failure detector
  t = now - @last - @heartbeat_interval + 1
  phi = PHI_FACTOR * t / mean

  return phi
end

#sample_sizeObject



74
75
76
# File 'lib/fluent/plugin/out_forward/failure_detector.rb', line 74

def sample_size
  @window.size
end