Class: StatsD::Instrument::BatchedSink::DispatcherStats

Inherits:
Object
  • Object
show all
Defined in:
lib/statsd/instrument/batched_sink.rb

Instance Method Summary collapse

Constructor Details

#initialize(interval, type) ⇒ DispatcherStats

Returns a new instance of DispatcherStats.



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/statsd/instrument/batched_sink.rb', line 93

def initialize(interval, type)
  # The number of times the batched udp sender needed to
  # send a statsd line synchronously, due to the buffer
  # being full.
  @synchronous_sends = 0
  # The number of times we send a batch of statsd lines,
  # of any size.
  @batched_sends = 0
  # The average buffer length, measured at the beginning of
  # each batch.
  @avg_buffer_length = 0
  # The average per-batch byte size of the packet sent to
  # the underlying UDPSink.
  @avg_batched_packet_size = 0
  # The average number of statsd lines per batch.
  @avg_batch_length = 0

  @sync_sends_metric = "statsd_instrument.batched_#{type}_sink.synchronous_sends"
  @batched_sends_metric = "statsd_instrument.batched_#{type}_sink.batched_sends"
  @avg_buffer_length_metric = "statsd_instrument.batched_#{type}_sink.avg_buffer_length"
  @avg_batched_packet_size_metric = "statsd_instrument.batched_#{type}_sink.avg_batched_packet_size"
  @avg_batch_length_metric = "statsd_instrument.batched_#{type}_sink.avg_batch_length"

  @mutex = Mutex.new

  @interval = interval
  @since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

Instance Method Details

#increment_batched_sends(buffer_len, packet_size, batch_len) ⇒ Object



150
151
152
153
154
155
156
157
# File 'lib/statsd/instrument/batched_sink.rb', line 150

def increment_batched_sends(buffer_len, packet_size, batch_len)
  @mutex.synchronize do
    @batched_sends += 1
    @avg_buffer_length += (buffer_len - @avg_buffer_length) / @batched_sends
    @avg_batched_packet_size += (packet_size - @avg_batched_packet_size) / @batched_sends
    @avg_batch_length += (batch_len - @avg_batch_length) / @batched_sends
  end
end

#increment_synchronous_sendsObject



146
147
148
# File 'lib/statsd/instrument/batched_sink.rb', line 146

def increment_synchronous_sends
  @mutex.synchronize { @synchronous_sends += 1 }
end

#maybe_flush!(force: false) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/statsd/instrument/batched_sink.rb', line 122

def maybe_flush!(force: false)
  return if !force && Process.clock_gettime(Process::CLOCK_MONOTONIC) - @since < @interval

  synchronous_sends = 0
  batched_sends = 0
  avg_buffer_length = 0
  avg_batched_packet_size = 0
  avg_batch_length = 0
  @mutex.synchronize do
    synchronous_sends, @synchronous_sends = @synchronous_sends, synchronous_sends
    batched_sends, @batched_sends = @batched_sends, batched_sends
    avg_buffer_length, @avg_buffer_length = @avg_buffer_length, avg_buffer_length
    avg_batched_packet_size, @avg_batched_packet_size = @avg_batched_packet_size, avg_batched_packet_size
    avg_batch_length, @avg_batch_length = @avg_batch_length, avg_batch_length
    @since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
  end

  StatsD.increment(@sync_sends_metric, synchronous_sends)
  StatsD.increment(@batched_sends_metric, batched_sends)
  StatsD.gauge(@avg_buffer_length_metric, avg_buffer_length)
  StatsD.gauge(@avg_batched_packet_size_metric, avg_batched_packet_size)
  StatsD.gauge(@avg_batch_length_metric, avg_batch_length)
end