Class: StatsD::Instrument::BatchedSink::DispatcherStats

Inherits:
Object
  • Object
show all
Defined in:
lib/statsd/instrument/batched_sink.rb

Instance Method Summary collapse

Constructor Details

#initialize(interval, type) ⇒ DispatcherStats

Returns a new instance of DispatcherStats.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/statsd/instrument/batched_sink.rb', line 88

def initialize(interval, type)
  # The number of times the batched udp sender needed to
  # send a statsd line synchronously, due to the buffer
  # being full.
  @synchronous_sends = 0
  # The number of times we send a batch of statsd lines,
  # of any size.
  @batched_sends = 0
  # The average buffer length, measured at the beginning of
  # each batch.
  @avg_buffer_length = 0
  # The average per-batch byte size of the packet sent to
  # the underlying UDPSink.
  @avg_batched_packet_size = 0
  # The average number of statsd lines per batch.
  @avg_batch_length = 0

  @sync_sends_metric = "statsd_instrument.batched_#{type}_sink.synchronous_sends"
  @batched_sends_metric = "statsd_instrument.batched_#{type}_sink.batched_sends"
  @avg_buffer_length_metric = "statsd_instrument.batched_#{type}_sink.avg_buffer_length"
  @avg_batched_packet_size_metric = "statsd_instrument.batched_#{type}_sink.avg_batched_packet_size"
  @avg_batch_length_metric = "statsd_instrument.batched_#{type}_sink.avg_batch_length"

  @mutex = Mutex.new

  @interval = interval
  @since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

Instance Method Details

#increment_batched_sends(buffer_len, packet_size, batch_len) ⇒ Object



145
146
147
148
149
150
151
152
# File 'lib/statsd/instrument/batched_sink.rb', line 145

def increment_batched_sends(buffer_len, packet_size, batch_len)
  @mutex.synchronize do
    @batched_sends += 1
    @avg_buffer_length += (buffer_len - @avg_buffer_length) / @batched_sends
    @avg_batched_packet_size += (packet_size - @avg_batched_packet_size) / @batched_sends
    @avg_batch_length += (batch_len - @avg_batch_length) / @batched_sends
  end
end

#increment_synchronous_sendsObject



141
142
143
# File 'lib/statsd/instrument/batched_sink.rb', line 141

def increment_synchronous_sends
  @mutex.synchronize { @synchronous_sends += 1 }
end

#maybe_flush!(force: false) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/statsd/instrument/batched_sink.rb', line 117

def maybe_flush!(force: false)
  return if !force && Process.clock_gettime(Process::CLOCK_MONOTONIC) - @since < @interval

  synchronous_sends = 0
  batched_sends = 0
  avg_buffer_length = 0
  avg_batched_packet_size = 0
  avg_batch_length = 0
  @mutex.synchronize do
    synchronous_sends, @synchronous_sends = @synchronous_sends, synchronous_sends
    batched_sends, @batched_sends = @batched_sends, batched_sends
    avg_buffer_length, @avg_buffer_length = @avg_buffer_length, avg_buffer_length
    avg_batched_packet_size, @avg_batched_packet_size = @avg_batched_packet_size, avg_batched_packet_size
    avg_batch_length, @avg_batch_length = @avg_batch_length, avg_batch_length
    @since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  end

  StatsD.increment(@sync_sends_metric, synchronous_sends)
  StatsD.increment(@batched_sends_metric, batched_sends)
  StatsD.gauge(@avg_buffer_length_metric, avg_buffer_length)
  StatsD.gauge(@avg_batched_packet_size_metric, avg_batched_packet_size)
  StatsD.gauge(@avg_batch_length_metric, avg_batch_length)
end