Class: Datadog::Writer
- Inherits:
-
Object
- Object
- Datadog::Writer
- Defined in:
- lib/ddtrace/writer.rb
Overview
Processor that sends traces and metadata to the agent
Instance Attribute Summary collapse
-
#priority_sampler ⇒ Object
readonly
Returns the value of attribute priority_sampler.
-
#runtime_metrics ⇒ Object
readonly
Returns the value of attribute runtime_metrics.
-
#transport ⇒ Object
readonly
Returns the value of attribute transport.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Instance Method Summary collapse
-
#initialize(options = {}) ⇒ Writer
constructor
A new instance of Writer.
- #send_runtime_metrics ⇒ Object
-
#send_spans(traces, transport) ⇒ Object
flush spans to the trace-agent, handles spans only.
-
#start ⇒ Object
spawns a worker for spans; they share the same transport which is thread-safe.
-
#stats ⇒ Object
stats returns a dictionary of stats about the writer.
-
#stop ⇒ Object
stops worker for spans.
-
#write(trace, services = nil) ⇒ Object
enqueue the trace for submission to the API.
Constructor Details
#initialize(options = {}) ⇒ Writer
Returns a new instance of Writer.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/ddtrace/writer.rb', line 19 def initialize( = {}) # writer and transport parameters @buff_size = .fetch(:buffer_size, Workers::AsyncTransport::DEFAULT_BUFFER_MAX_SIZE) @flush_interval = .fetch(:flush_interval, Workers::AsyncTransport::DEFAULT_FLUSH_INTERVAL) = .fetch(:transport_options, {}) # priority sampling if [:priority_sampler] @priority_sampler = [:priority_sampler] [:api_version] ||= Transport::HTTP::API::V4 end # transport and buffers @transport = .fetch(:transport) do Transport::HTTP.default() end # Runtime metrics @runtime_metrics = .fetch(:runtime_metrics) do Runtime::Metrics.new end # handles the thread creation after an eventual fork @mutex_after_fork = Mutex.new @pid = nil @traces_flushed = 0 # one worker for traces @worker = nil end |
Instance Attribute Details
#priority_sampler ⇒ Object (readonly)
Returns the value of attribute priority_sampler.
13 14 15 |
# File 'lib/ddtrace/writer.rb', line 13 def priority_sampler @priority_sampler end |
#runtime_metrics ⇒ Object (readonly)
Returns the value of attribute runtime_metrics.
13 14 15 |
# File 'lib/ddtrace/writer.rb', line 13 def runtime_metrics @runtime_metrics end |
#transport ⇒ Object (readonly)
Returns the value of attribute transport.
13 14 15 |
# File 'lib/ddtrace/writer.rb', line 13 def transport @transport end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
13 14 15 |
# File 'lib/ddtrace/writer.rb', line 13 def worker @worker end |
Instance Method Details
#send_runtime_metrics ⇒ Object
97 98 99 100 101 |
# File 'lib/ddtrace/writer.rb', line 97 def send_runtime_metrics return unless Datadog.configuration.runtime_metrics_enabled runtime_metrics.flush end |
#send_spans(traces, transport) ⇒ Object
flush spans to the trace-agent, handles spans only
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/ddtrace/writer.rb', line 75 def send_spans(traces, transport) return true if traces.empty? # Inject hostname if configured to do so inject_hostname!(traces) if Datadog.configuration.report_hostname # Send traces an get a response. response = transport.send_traces(traces) unless response.internal_error? @traces_flushed += traces.length unless response.server_error? # Update priority sampler unless priority_sampler.nil? || response.service_rates.nil? priority_sampler.update(response.service_rates) end end # Return if server error occurred. !response.server_error? end |
#start ⇒ Object
spawns a worker for spans; they share the same transport which is thread-safe
52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/ddtrace/writer.rb', line 52 def start @pid = Process.pid @trace_handler = ->(items, transport) { send_spans(items, transport) } @runtime_metrics_handler = -> { send_runtime_metrics } @worker = Datadog::Workers::AsyncTransport.new( transport: @transport, buffer_size: @buff_size, on_trace: @trace_handler, on_runtime_metrics: @runtime_metrics_handler, interval: @flush_interval ) @worker.start() end |
#stats ⇒ Object
stats returns a dictionary of stats about the writer.
137 138 139 140 141 142 |
# File 'lib/ddtrace/writer.rb', line 137 def stats { traces_flushed: @traces_flushed, transport: @transport.stats } end |
#stop ⇒ Object
stops worker for spans.
68 69 70 71 72 |
# File 'lib/ddtrace/writer.rb', line 68 def stop return if worker.nil? @worker.stop @worker = nil end |
#write(trace, services = nil) ⇒ Object
enqueue the trace for submission to the API
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/ddtrace/writer.rb', line 104 def write(trace, services = nil) unless services.nil? Datadog::Patcher.do_once('Writer#write') do Datadog::Tracer.log.warn(%( write: Writing services has been deprecated and no longer need to be provided. write(traces, services) can be updated to write(traces) )) end end # In multiprocess environments, the main process initializes the +Writer+ instance and if # the process forks (i.e. a web server like Unicorn or Puma with multiple workers) the new # processes will share the same +Writer+ until the first write (COW). Because of that, # each process owns a different copy of the +@buffer+ after each write and so the # +AsyncTransport+ will not send data to the trace agent. # # This check ensures that if a process doesn't own the current +Writer+, async workers # will be initialized again (but only once for each process). pid = Process.pid if pid != @pid # avoid using Mutex when pids are equal @mutex_after_fork.synchronize do # we should start threads because the worker doesn't own this start if pid != @pid end end # Associate root span with runtime metrics runtime_metrics.associate_with_span(trace.first) unless trace.empty? @worker.enqueue_trace(trace) end |