Class: NewRelic::Agent::InfiniteTracing::StreamingBuffer
- Inherits:
-
Object
- Object
- NewRelic::Agent::InfiniteTracing::StreamingBuffer
- Extended by:
- Forwardable
- Includes:
- Enumerable, Constants
- Defined in:
- lib/infinite_tracing/streaming_buffer.rb
Constant Summary collapse
- DEFAULT_QUEUE_SIZE =
10_000
- FLUSH_DELAY =
0.005
- MAX_FLUSH_WAIT =
three seconds
3
- MAX_BATCH_HOLD =
To ensure that two bits of info for the same transaction are recognized as belonging together, set a maximum time in seconds to elapse between batch submissions.
5
Constants included from Constants
Constants::GRPC_ERROR_NAME_METRIC, Constants::GRPC_OTHER_ERROR_METRIC, Constants::QUEUE_DUMPED_METRIC, Constants::RESPONSE_ERROR_METRIC, Constants::SPANS_SEEN_METRIC, Constants::SPANS_SENT_METRIC, Constants::SUPPORTABILITY_PREFIX
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
-
#<<(segment) ⇒ Object
Pushes the segment given onto the queue.
-
#batch_enumerator ⇒ Object
Returns the blocking enumerator that will pop items off the queue while any items are present.
-
#clear_queue ⇒ Object
Drops all segments from the queue and records a supportability metric for the event.
- #close_queue ⇒ Object
-
#enumerator ⇒ Object
Returns the blocking enumerator that will pop items off the queue while any items are present If
nil
is popped, the queue is closing. -
#flush_queue ⇒ Object
Waits for the queue to be fully consumed or for the waiting consumers to release.
-
#initialize(max_size = DEFAULT_QUEUE_SIZE) ⇒ StreamingBuffer
constructor
A new instance of StreamingBuffer.
-
#transfer(new_buffer) ⇒ Object
Dumps the contents of this streaming buffer onto the given buffer and closes the queue.
Constructor Details
#initialize(max_size = DEFAULT_QUEUE_SIZE) ⇒ StreamingBuffer
Returns a new instance of StreamingBuffer.
27 28 29 30 31 32 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 27 def initialize(max_size = DEFAULT_QUEUE_SIZE) @max_size = max_size @lock = Mutex.new @queue = Queue.new @batch = Array.new end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
25 26 27 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 25 def queue @queue end |
Instance Method Details
#<<(segment) ⇒ Object
Pushes the segment given onto the queue.
If the queue is at capacity, it is dumped and a supportability metric is recorded for the event.
When a restart signal is received, the queue is locked with a mutex, blocking the push until the queue has restarted.
51 52 53 54 55 56 57 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 51 def <<(segment) @lock.synchronize do clear_queue if @queue.size >= @max_size NewRelic::Agent.increment_metric(SPANS_SEEN_METRIC) @queue.push(segment) end end |
#batch_enumerator ⇒ Object
Returns the blocking enumerator that will pop items off the queue while any items are present
yielding is deferred until batch_size spans is reached.
If nil
is popped, the queue is closing. A final yield on non-empty batch is fired.
The segment is transformed into a serializable span here so processing is taking place within the gRPC call’s thread rather than in the main application thread.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 123 def batch_enumerator return enum_for(:batch_enumerator) unless block_given? last_time = Process.clock_gettime(Process::CLOCK_REALTIME) loop do if proc_or_segment = @queue.pop(false) NewRelic::Agent.increment_metric(SPANS_SENT_METRIC) @batch << transform(proc_or_segment) if batch_ready?(last_time) yield(SpanBatch.new(spans: @batch)) last_time = Process.clock_gettime(Process::CLOCK_REALTIME) @batch.clear end else yield(SpanBatch.new(spans: @batch)) unless @batch.empty? raise ClosedQueueError end end end |
#clear_queue ⇒ Object
Drops all segments from the queue and records a supportability metric for the event.
61 62 63 64 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 61 def clear_queue @queue.clear NewRelic::Agent.increment_metric(QUEUE_DUMPED_METRIC) end |
#close_queue ⇒ Object
84 85 86 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 84 def close_queue @lock.synchronize { @queue.close } end |
#enumerator ⇒ Object
Returns the blocking enumerator that will pop items off the queue while any items are present If nil
is popped, the queue is closing.
The segment is transformed into a serializable span here so processing is taking place within the gRPC call’s thread rather than in the main application thread.
96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 96 def enumerator return enum_for(:enumerator) unless block_given? loop do if segment = @queue.pop(false) NewRelic::Agent.increment_metric(SPANS_SENT_METRIC) yield(transform(segment)) else raise ClosedQueueError end end end |
#flush_queue ⇒ Object
Waits for the queue to be fully consumed or for the waiting consumers to release.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 68 def flush_queue @queue.num_waiting.times { @queue.push(nil) } close_queue # Logs if we're throwing away spans because nothing's # waiting to take them off the queue. if @queue.num_waiting == 0 && !@queue.empty? NewRelic::Agent.logger.warn("Discarding #{@queue.size} segments on Streaming Buffer") return end # Only wait a short while for queue to flush cutoff = Process.clock_gettime(Process::CLOCK_MONOTONIC) + MAX_FLUSH_WAIT until @queue.empty? || Process.clock_gettime(Process::CLOCK_MONOTONIC) >= cutoff do sleep(FLUSH_DELAY) end end |
#transfer(new_buffer) ⇒ Object
Dumps the contents of this streaming buffer onto the given buffer and closes the queue
36 37 38 39 40 41 |
# File 'lib/infinite_tracing/streaming_buffer.rb', line 36 def transfer(new_buffer) @lock.synchronize do until @queue.empty? do new_buffer.push(@queue.pop) end @queue.close end end |