Class: NewRelic::Agent::InfiniteTracing::StreamingBuffer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Enumerable, Constants
Defined in:
lib/infinite_tracing/streaming_buffer.rb

Constant Summary collapse

DEFAULT_QUEUE_SIZE =
10_000
FLUSH_DELAY =
0.005
MAX_FLUSH_WAIT =

three seconds

3
MAX_BATCH_HOLD =

To ensure that two bits of info for the same transaction are recognized as belonging together, set a maximum time in seconds to elapse between batch submissions.

5

Constants included from Constants

Constants::GRPC_ERROR_NAME_METRIC, Constants::GRPC_OTHER_ERROR_METRIC, Constants::QUEUE_DUMPED_METRIC, Constants::RESPONSE_ERROR_METRIC, Constants::SPANS_SEEN_METRIC, Constants::SPANS_SENT_METRIC, Constants::SUPPORTABILITY_PREFIX

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_size = DEFAULT_QUEUE_SIZE) ⇒ StreamingBuffer

Returns a new instance of StreamingBuffer.



27
28
29
30
31
32
# File 'lib/infinite_tracing/streaming_buffer.rb', line 27

def initialize(max_size = DEFAULT_QUEUE_SIZE)
  @max_size = max_size
  @lock = Mutex.new
  @queue = Queue.new
  @batch = Array.new
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



25
26
27
# File 'lib/infinite_tracing/streaming_buffer.rb', line 25

def queue
  @queue
end

Instance Method Details

#<<(segment) ⇒ Object

Pushes the segment given onto the queue.

If the queue is at capacity, it is dumped and a supportability metric is recorded for the event.

When a restart signal is received, the queue is locked with a mutex, blocking the push until the queue has restarted.



51
52
53
54
55
56
57
# File 'lib/infinite_tracing/streaming_buffer.rb', line 51

def <<(segment)
  @lock.synchronize do
    clear_queue if @queue.size >= @max_size
    NewRelic::Agent.increment_metric(SPANS_SEEN_METRIC)
    @queue.push(segment)
  end
end

#batch_enumeratorObject

Returns the blocking enumerator that will pop items off the queue while any items are present

yielding is deferred until batch_size spans is reached.

If nil is popped, the queue is closing. A final yield on non-empty batch is fired.

The segment is transformed into a serializable span here so processing is taking place within the gRPC call’s thread rather than in the main application thread.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/infinite_tracing/streaming_buffer.rb', line 123

def batch_enumerator
  return enum_for(:batch_enumerator) unless block_given?

  last_time = Process.clock_gettime(Process::CLOCK_REALTIME)
  loop do
    if proc_or_segment = @queue.pop(false)
      NewRelic::Agent.increment_metric(SPANS_SENT_METRIC)
      @batch << transform(proc_or_segment)
      if batch_ready?(last_time)
        yield(SpanBatch.new(spans: @batch))
        last_time = Process.clock_gettime(Process::CLOCK_REALTIME)
        @batch.clear
      end

    else
      yield(SpanBatch.new(spans: @batch)) unless @batch.empty?
      raise ClosedQueueError
    end
  end
end

#clear_queueObject

Drops all segments from the queue and records a supportability metric for the event.



61
62
63
64
# File 'lib/infinite_tracing/streaming_buffer.rb', line 61

def clear_queue
  @queue.clear
  NewRelic::Agent.increment_metric(QUEUE_DUMPED_METRIC)
end

#close_queueObject



84
85
86
# File 'lib/infinite_tracing/streaming_buffer.rb', line 84

def close_queue
  @lock.synchronize { @queue.close }
end

#enumeratorObject

Returns the blocking enumerator that will pop items off the queue while any items are present If nil is popped, the queue is closing.

The segment is transformed into a serializable span here so processing is taking place within the gRPC call’s thread rather than in the main application thread.



96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/infinite_tracing/streaming_buffer.rb', line 96

def enumerator
  return enum_for(:enumerator) unless block_given?

  loop do
    if segment = @queue.pop(false)
      NewRelic::Agent.increment_metric(SPANS_SENT_METRIC)
      yield(transform(segment))

    else
      raise ClosedQueueError
    end
  end
end

#flush_queueObject

Waits for the queue to be fully consumed or for the waiting consumers to release.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/infinite_tracing/streaming_buffer.rb', line 68

def flush_queue
  @queue.num_waiting.times { @queue.push(nil) }
  close_queue

  # Logs if we're throwing away spans because nothing's
  # waiting to take them off the queue.
  if @queue.num_waiting == 0 && !@queue.empty?
    NewRelic::Agent.logger.warn("Discarding #{@queue.size} segments on Streaming Buffer")
    return
  end

  # Only wait a short while for queue to flush
  cutoff = Process.clock_gettime(Process::CLOCK_MONOTONIC) + MAX_FLUSH_WAIT
  until @queue.empty? || Process.clock_gettime(Process::CLOCK_MONOTONIC) >= cutoff do sleep(FLUSH_DELAY) end
end

#transfer(new_buffer) ⇒ Object

Dumps the contents of this streaming buffer onto the given buffer and closes the queue



36
37
38
39
40
41
# File 'lib/infinite_tracing/streaming_buffer.rb', line 36

def transfer(new_buffer)
  @lock.synchronize do
    until @queue.empty? do new_buffer.push(@queue.pop) end
    @queue.close
  end
end