Class: StatsD::Instrument::BatchedSink::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/statsd/instrument/batched_sink.rb

Instance Method Summary collapse

Constructor Details

#initialize(sink, buffer_capacity, thread_priority, max_packet_size, statistics_interval) ⇒ Dispatcher

Returns a new instance of Dispatcher.



156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/statsd/instrument/batched_sink.rb', line 156

def initialize(sink, buffer_capacity, thread_priority, max_packet_size, statistics_interval)
  @sink = sink
  @interrupted = false
  @thread_priority = thread_priority
  @max_packet_size = max_packet_size
  @buffer_capacity = buffer_capacity
  @buffer = Buffer.new(buffer_capacity)
  @dispatcher_thread = Thread.new { dispatch }
  @pid = Process.pid
  if statistics_interval > 0
    type = @sink.connection.type
    @statistics = DispatcherStats.new(statistics_interval, type)
  end
end

Instance Method Details

#<<(datagram) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
# File 'lib/statsd/instrument/batched_sink.rb', line 171

def <<(datagram)
  if !thread_healthcheck || !@buffer.push_nonblock(datagram)
    # The buffer is full or the thread can't be respawned,
    # we'll send the datagram synchronously
    @sink << datagram

    @statistics&.increment_synchronous_sends
  end

  self
end

#flush(blocking:) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/statsd/instrument/batched_sink.rb', line 192

def flush(blocking:)
  packet = "".b
  next_datagram = nil
  until @buffer.closed? && @buffer.empty? && next_datagram.nil?
    if blocking
      next_datagram ||= @buffer.pop
      break if next_datagram.nil? # queue was closed
    else
      next_datagram ||= @buffer.pop_nonblock
      break if next_datagram.nil? # no datagram in buffer
    end
    buffer_len = @buffer.length + 1
    batch_len = 1

    packet << next_datagram
    next_datagram = nil
    if packet.bytesize <= @max_packet_size
      while (next_datagram = @buffer.pop_nonblock)
        if @max_packet_size - packet.bytesize - 1 > next_datagram.bytesize
          packet << NEWLINE << next_datagram
          batch_len += 1
        else
          break
        end
      end
    end

    packet_size = packet.bytesize
    @sink << packet
    packet.clear

    @statistics&.increment_batched_sends(buffer_len, packet_size, batch_len)
    @statistics&.maybe_flush!
  end
end

#shutdown(wait = 2) ⇒ Object



183
184
185
186
187
188
189
190
# File 'lib/statsd/instrument/batched_sink.rb', line 183

def shutdown(wait = 2)
  @interrupted = true
  @buffer.close
  if @dispatcher_thread&.alive?
    @dispatcher_thread.join(wait)
  end
  flush(blocking: false)
end