Class: Datadog::Tracing::Workers::AsyncTraceWriter
- Inherits:
-
TraceWriter
- Object
- Core::Worker
- TraceWriter
- Datadog::Tracing::Workers::AsyncTraceWriter
- Includes:
- Core::Workers::Polling, Core::Workers::Queue
- Defined in:
- lib/datadog/tracing/workers/trace_writer.rb
Overview
Writes traces to transport asynchronously, using a thread & buffer.
Constant Summary collapse
- DEFAULT_BUFFER_MAX_SIZE =
1000- FORK_POLICY_ASYNC =
:async- FORK_POLICY_SYNC =
:sync
Constants included from Core::Workers::Polling
Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT
Instance Attribute Summary collapse
-
#async ⇒ Object
writeonly
Sets the attribute async.
Attributes included from Core::Workers::Queue
Attributes inherited from TraceWriter
#agent_settings, #logger, #transport
Attributes inherited from Core::Worker
Instance Method Summary collapse
- #after_fork ⇒ Object
- #async? ⇒ Boolean
- #dequeue ⇒ Object
- #enqueue(trace) ⇒ Object
- #fork_policy=(policy) ⇒ Object
-
#initialize(options = {}) ⇒ AsyncTraceWriter
constructor
A new instance of AsyncTraceWriter.
-
#perform(traces) ⇒ Object
NOTE: #perform is wrapped by other modules: Polling –> Async –> IntervalLoop –> AsyncTraceWriter –> TraceWriter.
- #stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object
-
#work_pending? ⇒ Boolean
Are there more traces to be processed next?.
-
#write(trace) ⇒ Object
WARNING: This method breaks the Liskov Substitution Principle – TraceWriter#write is spec’d to return the result from the writer, whereas this method returns something else when running in async mode.
Methods included from Core::Workers::Polling
#enabled=, #enabled?, included
Methods included from Core::Workers::Queue
Methods inherited from TraceWriter
#flush_completed, #flush_traces, #process_traces, #write_traces
Constructor Details
#initialize(options = {}) ⇒ AsyncTraceWriter
Returns a new instance of AsyncTraceWriter.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 95 def initialize( = {}) # Workers::TraceWriter settings super # Workers::Polling settings self.enabled = .fetch(:enabled, true) # Workers::Async::Thread settings @async = true self.fork_policy = .fetch(:fork_policy, FORK_POLICY_ASYNC) # Workers::IntervalLoop settings self.loop_base_interval = [:interval] if .key?(:interval) self.loop_back_off_ratio = [:back_off_ratio] if .key?(:back_off_ratio) self.loop_back_off_max = [:back_off_max] if .key?(:back_off_max) # Workers::Queue settings @buffer_size = .fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE) self.buffer = TraceBuffer.new(@buffer_size) @shutdown_timeout = .fetch(:shutdown_timeout, Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT) end |
Instance Attribute Details
#async=(value) ⇒ Object (writeonly)
Sets the attribute async
92 93 94 |
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 92 def async=(value) @async = value end |
Instance Method Details
#after_fork ⇒ Object
178 179 180 181 182 183 184 185 186 187 |
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 178 def after_fork # In multiprocess environments, forks will share the same buffer until its written to. # A.K.A. copy-on-write. We don't want forks to write traces generated from another process. # Instead, we reset it after the fork. (Make sure any enqueue operations happen after this.) self.buffer = TraceBuffer.new(@buffer_size) # Switch to synchronous mode if configured to do so. # In some cases synchronous writing is preferred because the fork will be short lived. @async = false if @writer_fork_policy == FORK_POLICY_SYNC end |
#async? ⇒ Boolean
154 155 156 |
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 154 def async? @async == true end |
#dequeue ⇒ Object
140 141 142 143 144 |
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 140 def dequeue # Wrap results in Array because they are # splatted as args against TraceWriter#perform. [buffer.pop] end |
#enqueue(trace) ⇒ Object
136 137 138 |
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 136 def enqueue(trace) buffer.push(trace) end |
#fork_policy=(policy) ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 158 def fork_policy=(policy) # Translate to Workers::Async::Thread policy thread_fork_policy = case policy when Core::Workers::Async::Thread::FORK_POLICY_STOP policy when FORK_POLICY_SYNC # Stop the async thread because the writer # will bypass and run synchronously. Core::Workers::Async::Thread::FORK_POLICY_STOP else Core::Workers::Async::Thread::FORK_POLICY_RESTART end # Update thread fork policy super(thread_fork_policy) # Update local policy @writer_fork_policy = policy end |
#perform(traces) ⇒ Object
NOTE: #perform is wrapped by other modules:
Polling --> Async --> IntervalLoop --> AsyncTraceWriter --> TraceWriter
WARNING: This method breaks the Liskov Substitution Principle – TraceWriter#perform is spec’d to return the result from the writer, whereas this method always returns nil.
123 124 125 126 127 128 129 |
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 123 def perform(traces) super.tap do |responses| loop_back_off! if responses.find(&:server_error?) end nil end |
#stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object
131 132 133 134 |
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 131 def stop(force_stop = false, timeout = @shutdown_timeout) buffer.close if running? super end |
#work_pending? ⇒ Boolean
Are there more traces to be processed next?
147 148 149 150 151 152 |
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 147 def work_pending? # This is the same implementation as in Queue, but it was # overwritten by IntervalLoop on its way to this worker class. # See the comments in those two methods for more info. !buffer.empty? end |
#write(trace) ⇒ Object
WARNING: This method breaks the Liskov Substitution Principle – TraceWriter#write is spec’d to return the result from the writer, whereas this method returns something else when running in async mode.
191 192 193 194 195 196 197 198 199 200 |
# File 'lib/datadog/tracing/workers/trace_writer.rb', line 191 def write(trace) # Start worker thread. If the process has forked, it will trigger #after_fork to # reconfigure the worker accordingly. # NOTE: It's important we do this before queuing or it will drop the current trace, # because #after_fork resets the buffer. perform # Queue the trace if running asynchronously, otherwise short-circuit and write it directly. async? ? enqueue(trace) : write_traces([trace]) end |