Class: StatsD::Instrument::BatchedSink::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/statsd/instrument/batched_sink.rb

Instance Method Summary collapse

Constructor Details

#initialize(sink, buffer_capacity, thread_priority, max_packet_size, statistics_interval) ⇒ Dispatcher

Returns a new instance of Dispatcher.



161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/statsd/instrument/batched_sink.rb', line 161

def initialize(sink, buffer_capacity, thread_priority, max_packet_size, statistics_interval)
  @sink = sink
  @interrupted = false
  @thread_priority = thread_priority
  @max_packet_size = max_packet_size
  @buffer_capacity = buffer_capacity
  @buffer = Buffer.new(buffer_capacity)
  @dispatcher_thread = Thread.new { dispatch }
  @pid = Process.pid
  if statistics_interval > 0
    type = @sink.connection.type
    @statistics = DispatcherStats.new(statistics_interval, type)
  end
end

Instance Method Details

#<<(datagram) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
# File 'lib/statsd/instrument/batched_sink.rb', line 176

def <<(datagram)
  if !thread_healthcheck || !@buffer.push_nonblock(datagram)
    # The buffer is full or the thread can't be respawned,
    # we'll send the datagram synchronously
    @sink << datagram

    @statistics&.increment_synchronous_sends
  end

  self
end

#flush(blocking:) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/statsd/instrument/batched_sink.rb', line 197

def flush(blocking:)
  packet = "".b
  next_datagram = nil
  until @buffer.closed? && @buffer.empty? && next_datagram.nil?
    if blocking
      next_datagram ||= @buffer.pop
      break if next_datagram.nil? # queue was closed
    else
      next_datagram ||= @buffer.pop_nonblock
      break if next_datagram.nil? # no datagram in buffer
    end
    buffer_len = @buffer.length + 1
    batch_len = 1

    packet << next_datagram
    next_datagram = nil
    if packet.bytesize <= @max_packet_size
      while (next_datagram = @buffer.pop_nonblock)
        if @max_packet_size - packet.bytesize - 1 > next_datagram.bytesize
          packet << NEWLINE << next_datagram
          batch_len += 1
        else
          break
        end
      end
    end

    packet_size = packet.bytesize
    @sink << packet
    packet.clear

    @statistics&.increment_batched_sends(buffer_len, packet_size, batch_len)
    @statistics&.maybe_flush!
  end
end

#shutdown(wait = 2) ⇒ Object



188
189
190
191
192
193
194
195
# File 'lib/statsd/instrument/batched_sink.rb', line 188

def shutdown(wait = 2)
  @interrupted = true
  @buffer.close
  if @dispatcher_thread&.alive?
    @dispatcher_thread.join(wait)
  end
  flush(blocking: false)
end