Class: Datadog::Writer

Inherits:
Object
  • Object
show all
Defined in:
lib/ddtrace/writer.rb

Overview

Processor that sends traces and metadata to the agent

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Writer

Returns a new instance of Writer.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/ddtrace/writer.rb', line 19

def initialize(options = {})
  # writer and transport parameters
  @buff_size = options.fetch(:buffer_size, Workers::AsyncTransport::DEFAULT_BUFFER_MAX_SIZE)
  @flush_interval = options.fetch(:flush_interval, Workers::AsyncTransport::DEFAULT_FLUSH_INTERVAL)
  transport_options = options.fetch(:transport_options, {})

  # priority sampling
  if options[:priority_sampler]
    @priority_sampler = options[:priority_sampler]
    transport_options[:api_version] ||= Transport::HTTP::API::V4
  end

  # transport and buffers
  @transport = options.fetch(:transport) do
    Transport::HTTP.default(transport_options)
  end

  # Runtime metrics
  @runtime_metrics = options.fetch(:runtime_metrics) do
    Runtime::Metrics.new
  end

  # handles the thread creation after an eventual fork
  @mutex_after_fork = Mutex.new
  @pid = nil

  @traces_flushed = 0

  # one worker for traces
  @worker = nil
end

Instance Attribute Details

#priority_samplerObject (readonly)

Returns the value of attribute priority_sampler.



13
14
15
# File 'lib/ddtrace/writer.rb', line 13

def priority_sampler
  @priority_sampler
end

#runtime_metricsObject (readonly)

Returns the value of attribute runtime_metrics.



13
14
15
# File 'lib/ddtrace/writer.rb', line 13

def runtime_metrics
  @runtime_metrics
end

#transportObject (readonly)

Returns the value of attribute transport.



13
14
15
# File 'lib/ddtrace/writer.rb', line 13

def transport
  @transport
end

#workerObject (readonly)

Returns the value of attribute worker.



13
14
15
# File 'lib/ddtrace/writer.rb', line 13

def worker
  @worker
end

Instance Method Details

#send_runtime_metricsObject



97
98
99
100
101
# File 'lib/ddtrace/writer.rb', line 97

def send_runtime_metrics
  return unless Datadog.configuration.runtime_metrics_enabled

  runtime_metrics.flush
end

#send_spans(traces, transport) ⇒ Object

flush spans to the trace-agent, handles spans only



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/ddtrace/writer.rb', line 75

def send_spans(traces, transport)
  return true if traces.empty?

  # Inject hostname if configured to do so
  inject_hostname!(traces) if Datadog.configuration.report_hostname

  # Send traces an get a response.
  response = transport.send_traces(traces)

  unless response.internal_error?
    @traces_flushed += traces.length unless response.server_error?

    # Update priority sampler
    unless priority_sampler.nil? || response.service_rates.nil?
      priority_sampler.update(response.service_rates)
    end
  end

  # Return if server error occurred.
  !response.server_error?
end

#startObject

spawns a worker for spans; they share the same transport which is thread-safe



52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/ddtrace/writer.rb', line 52

def start
  @pid = Process.pid
  @trace_handler = ->(items, transport) { send_spans(items, transport) }
  @runtime_metrics_handler = -> { send_runtime_metrics }
  @worker = Datadog::Workers::AsyncTransport.new(
    transport: @transport,
    buffer_size: @buff_size,
    on_trace: @trace_handler,
    on_runtime_metrics: @runtime_metrics_handler,
    interval: @flush_interval
  )

  @worker.start()
end

#statsObject

stats returns a dictionary of stats about the writer.



137
138
139
140
141
142
# File 'lib/ddtrace/writer.rb', line 137

def stats
  {
    traces_flushed: @traces_flushed,
    transport: @transport.stats
  }
end

#stopObject

stops worker for spans.



68
69
70
71
72
# File 'lib/ddtrace/writer.rb', line 68

def stop
  return if worker.nil?
  @worker.stop
  @worker = nil
end

#write(trace, services = nil) ⇒ Object

enqueue the trace for submission to the API



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/ddtrace/writer.rb', line 104

def write(trace, services = nil)
  unless services.nil?
    Datadog::Patcher.do_once('Writer#write') do
      Datadog::Tracer.log.warn(%(
        write: Writing services has been deprecated and no longer need to be provided.
        write(traces, services) can be updated to write(traces)
      ))
    end
  end

  # In multiprocess environments, the main process initializes the +Writer+ instance and if
  # the process forks (i.e. a web server like Unicorn or Puma with multiple workers) the new
  # processes will share the same +Writer+ until the first write (COW). Because of that,
  # each process owns a different copy of the +@buffer+ after each write and so the
  # +AsyncTransport+ will not send data to the trace agent.
  #
  # This check ensures that if a process doesn't own the current +Writer+, async workers
  # will be initialized again (but only once for each process).
  pid = Process.pid
  if pid != @pid # avoid using Mutex when pids are equal
    @mutex_after_fork.synchronize do
      # we should start threads because the worker doesn't own this
      start if pid != @pid
    end
  end

  # Associate root span with runtime metrics
  runtime_metrics.associate_with_span(trace.first) unless trace.empty?

  @worker.enqueue_trace(trace)
end