Class: Datadog::CI::TestOptimisation::Coverage::Writer
- Inherits:
-
Object
- Object
- Datadog::CI::TestOptimisation::Coverage::Writer
- Includes:
- Core::Workers::Polling, Core::Workers::Queue
- Defined in:
- lib/datadog/ci/test_optimisation/coverage/writer.rb
Constant Summary collapse
- DEFAULT_BUFFER_MAX_SIZE =
10_000
- DEFAULT_SHUTDOWN_TIMEOUT =
60
- DEFAULT_INTERVAL =
3
Instance Attribute Summary collapse
-
#transport ⇒ Object
readonly
Returns the value of attribute transport.
Instance Method Summary collapse
- #after_fork ⇒ Object
- #async? ⇒ Boolean
- #buffer_klass ⇒ Object
- #dequeue ⇒ Object
- #enqueue(event) ⇒ Object
-
#initialize(transport:, options: {}) ⇒ Writer
constructor
A new instance of Writer.
- #perform(*events) ⇒ Object
- #stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object
- #work_pending? ⇒ Boolean
- #write(event) ⇒ Object
Constructor Details
#initialize(transport:, options: {}) ⇒ Writer
Returns a new instance of Writer.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/datadog/ci/test_optimisation/coverage/writer.rb', line 27 def initialize(transport:, options: {}) @transport = transport # Workers::Polling settings self.enabled = .fetch(:enabled, true) # Workers::Async::Thread settings self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_RESTART # Workers::IntervalLoop settings self.loop_base_interval = [:interval] || DEFAULT_INTERVAL self.loop_back_off_ratio = [:back_off_ratio] if .key?(:back_off_ratio) self.loop_back_off_max = [:back_off_max] if .key?(:back_off_max) @buffer_size = .fetch(:buffer_size, DEFAULT_BUFFER_MAX_SIZE) self.buffer = buffer_klass.new(@buffer_size) @shutdown_timeout = .fetch(:shutdown_timeout, DEFAULT_SHUTDOWN_TIMEOUT) @stopped = false end |
Instance Attribute Details
#transport ⇒ Object (readonly)
Returns the value of attribute transport.
20 21 22 |
# File 'lib/datadog/ci/test_optimisation/coverage/writer.rb', line 20 def transport @transport end |
Instance Method Details
#after_fork ⇒ Object
98 99 100 101 102 103 |
# File 'lib/datadog/ci/test_optimisation/coverage/writer.rb', line 98 def after_fork # In multiprocess environments, forks will share the same buffer until its written to. # A.K.A. copy-on-write. We don't want forks to write events generated from another process. # Instead, we reset it after the fork. (Make sure any enqueue operations happen after this.) self.buffer = buffer_klass.new(@buffer_size) end |
#async? ⇒ Boolean
94 95 96 |
# File 'lib/datadog/ci/test_optimisation/coverage/writer.rb', line 94 def async? true end |
#buffer_klass ⇒ Object
105 106 107 108 109 110 111 |
# File 'lib/datadog/ci/test_optimisation/coverage/writer.rb', line 105 def buffer_klass if Core::Environment::Ext::RUBY_ENGINE == "ruby" Core::Buffer::CRuby else Core::Buffer::ThreadSafe end end |
#dequeue ⇒ Object
86 87 88 |
# File 'lib/datadog/ci/test_optimisation/coverage/writer.rb', line 86 def dequeue buffer.pop end |
#enqueue(event) ⇒ Object
82 83 84 |
# File 'lib/datadog/ci/test_optimisation/coverage/writer.rb', line 82 def enqueue(event) buffer.push(event) end |
#perform(*events) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/datadog/ci/test_optimisation/coverage/writer.rb', line 60 def perform(*events) responses = transport.send_events(events) if responses.find(&:server_error?) loop_back_off! Datadog.logger.warn { "Encountered server error while sending coverage events" } end nil rescue => e Datadog.logger.warn { "Error while sending coverage events: #{e}" } loop_back_off! end |
#stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object
74 75 76 77 78 79 80 |
# File 'lib/datadog/ci/test_optimisation/coverage/writer.rb', line 74 def stop(force_stop = false, timeout = @shutdown_timeout) @stopped = true buffer.close if running? super end |
#work_pending? ⇒ Boolean
90 91 92 |
# File 'lib/datadog/ci/test_optimisation/coverage/writer.rb', line 90 def work_pending? !buffer.empty? end |
#write(event) ⇒ Object
50 51 52 53 54 55 56 57 58 |
# File 'lib/datadog/ci/test_optimisation/coverage/writer.rb', line 50 def write(event) return if @stopped # Start worker thread. If the process has forked, it will trigger #after_fork to # reconfigure the worker accordingly. perform enqueue(event) end |