Class: Datadog::Tracing::Writer

Inherits:
Object
  • Object
show all
Defined in:
lib/datadog/tracing/writer.rb

Overview

Processor that sends traces and metadata to the agent DEV: Our goal is for Datadog::Tracing::Workers::TraceWriter to replace this class in the future

Defined Under Namespace

Classes: Events

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Writer

Returns a new instance of Writer.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/datadog/tracing/writer.rb', line 20

def initialize(options = {})
  # writer and transport parameters
  @buff_size = options.fetch(:buffer_size, Workers::AsyncTransport::DEFAULT_BUFFER_MAX_SIZE)
  @flush_interval = options.fetch(:flush_interval, Workers::AsyncTransport::DEFAULT_FLUSH_INTERVAL)
  transport_options = options.fetch(:transport_options, {})

  transport_options[:agent_settings] = options[:agent_settings] if options.key?(:agent_settings)

  # transport and buffers
  @transport = options.fetch(:transport) do
    Transport::HTTP.default(**transport_options)
  end

  @shutdown_timeout = options.fetch(:shutdown_timeout, Workers::AsyncTransport::DEFAULT_SHUTDOWN_TIMEOUT)

  # handles the thread creation after an eventual fork
  @mutex_after_fork = Mutex.new
  @pid = nil

  @traces_flushed = 0

  # one worker for traces
  @worker = nil

  # Once stopped, this writer instance cannot be restarted.
  # This allow for graceful shutdown, while preventing
  # the host application from inadvertently start new
  # threads during shutdown.
  @stopped = false

  # Callback handler
  @events = Events.new
end

Instance Attribute Details

#eventsObject (readonly)

Returns the value of attribute events.



15
16
17
# File 'lib/datadog/tracing/writer.rb', line 15

def events
  @events
end

#transportObject (readonly)

Returns the value of attribute transport.



15
16
17
# File 'lib/datadog/tracing/writer.rb', line 15

def transport
  @transport
end

#workerObject (readonly)

Returns the value of attribute worker.



15
16
17
# File 'lib/datadog/tracing/writer.rb', line 15

def worker
  @worker
end

Instance Method Details

#startObject

Explicitly starts the Datadog::Tracing::Writer‘s internal worker.

The Datadog::Tracing::Writer is also automatically started when necessary during calls to #write.



57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/datadog/tracing/writer.rb', line 57

def start
  @mutex_after_fork.synchronize do
    return false if @stopped

    pid = Process.pid
    return if @worker && pid == @pid

    @pid = pid

    start_worker
    true
  end
end

#statsObject

stats returns a dictionary of stats about the writer.



127
128
129
130
131
132
# File 'lib/datadog/tracing/writer.rb', line 127

def stats
  {
    traces_flushed: @traces_flushed,
    transport: @transport.stats
  }
end

#stopObject

Gracefully shuts down this writer.

Once stopped methods calls won’t fail, but no internal work will be performed.

It is not possible to restart a stopped writer instance.



77
78
79
# File 'lib/datadog/tracing/writer.rb', line 77

def stop
  @mutex_after_fork.synchronize { stop_worker }
end

#write(trace) ⇒ Object

enqueue the trace for submission to the API



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/datadog/tracing/writer.rb', line 101

def write(trace)
  # In multiprocess environments, the main process initializes the +Writer+ instance and if
  # the process forks (i.e. a web server like Unicorn or Puma with multiple workers) the new
  # processes will share the same +Writer+ until the first write (COW). Because of that,
  # each process owns a different copy of the +@buffer+ after each write and so the
  # +AsyncTransport+ will not send data to the trace agent.
  #
  # This check ensures that if a process doesn't own the current +Writer+, async workers
  # will be initialized again (but only once for each process).
  start if @worker.nil? || @pid != Process.pid

  # TODO: Remove this, and have the tracer pump traces directly to runtime metrics
  #       instead of working through the trace writer.
  # Associate trace with runtime metrics
  Runtime::Metrics.associate_trace(trace)

  worker_local = @worker

  if worker_local
    worker_local.enqueue_trace(trace)
  elsif !@stopped
    Datadog.logger.debug('Writer either failed to start or was stopped before #write could complete')
  end
end