Class: Datadog::Statsd::SingleThreadSender

Inherits:
Object
  • Object
show all
Defined in:
lib/datadog/statsd/single_thread_sender.rb

Overview

The SingleThreadSender is a sender synchronously buffering messages in a ‘MessageBuffer`. It is using current Process.PID to check it is the result of a recent fork and it is reseting the MessageBuffer if that’s the case.

Instance Method Summary collapse

Constructor Details

#initialize(message_buffer, logger: nil, flush_interval: nil, queue_size: 1) ⇒ SingleThreadSender

Returns a new instance of SingleThreadSender.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/datadog/statsd/single_thread_sender.rb', line 10

def initialize(message_buffer, logger: nil, flush_interval: nil, queue_size: 1)
  @message_buffer = message_buffer
  @logger = logger
  @mx = Mutex.new
  @message_queue_size = queue_size
  @message_queue = []
  @flush_timer = if flush_interval
    Datadog::Statsd::Timer.new(flush_interval) { flush }
  else
    nil
  end
  # store the pid for which this sender has been created
  update_fork_pid
end

Instance Method Details

#add(message) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/datadog/statsd/single_thread_sender.rb', line 25

def add(message)
  @mx.synchronize {
    # we have just forked, meaning we have messages in the buffer that we should
    # not send, they belong to the parent process, let's clear the buffer.
    if forked?
      @message_buffer.reset
      @message_queue.clear
      @flush_timer.start if @flush_timer && @flush_timer.stop?
      update_fork_pid
    end

    @message_queue << message
    if @message_queue.size >= @message_queue_size
      drain_message_queue
    end
  }
end

#flushObject



43
44
45
46
47
48
# File 'lib/datadog/statsd/single_thread_sender.rb', line 43

def flush(*)
  @mx.synchronize {
    drain_message_queue
    @message_buffer.flush()
  }
end

#rendez_vousObject

Compatibility with ‘Sender`



59
60
# File 'lib/datadog/statsd/single_thread_sender.rb', line 59

def rendez_vous()
end

#startObject



50
51
52
# File 'lib/datadog/statsd/single_thread_sender.rb', line 50

def start()
  @flush_timer.start if @flush_timer
end

#stopObject



54
55
56
# File 'lib/datadog/statsd/single_thread_sender.rb', line 54

def stop()
  @flush_timer.stop if @flush_timer
end