Class: Datadog::DataStreams::Processor
- Inherits:
-
Core::Worker
- Object
- Core::Worker
- Datadog::DataStreams::Processor
- Includes:
- Core::Workers::Polling
- Defined in:
- lib/datadog/data_streams/processor.rb
Overview
Processor for Data Streams Monitoring This class is responsible for collecting and reporting pathway stats Periodically (every interval, 10 seconds by default) flushes stats to the Datadog agent.
Constant Summary collapse
- PROPAGATION_KEY =
'dd-pathway-ctx-base64'- DEFAULT_BUFFER_SIZE =
Default buffer size for lock-free event queue Set to handle high-throughput scenarios (e.g., 10k events/sec for 10s interval)
100_000
Constants included from Core::Workers::Polling
Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT
Instance Attribute Summary collapse
-
#bucket_size_ns ⇒ Object
readonly
Returns the value of attribute bucket_size_ns.
-
#buckets ⇒ Object
readonly
Returns the value of attribute buckets.
-
#pathway_context ⇒ Object
readonly
Returns the value of attribute pathway_context.
Attributes inherited from Core::Worker
Instance Method Summary collapse
-
#initialize(interval:, logger:, settings:, agent_settings:, agent_info:, buffer_size: DEFAULT_BUFFER_SIZE) ⇒ Processor
constructor
Initialize the Data Streams Monitoring processor.
-
#perform ⇒ Object
Called periodically by the worker to flush stats to the agent.
-
#set_consume_checkpoint(type:, source:, manual_checkpoint: true, tags: {}) {|key| ... } ⇒ String
Set a consume checkpoint.
-
#set_produce_checkpoint(type:, destination:, manual_checkpoint: true, tags: {}) {|key, value| ... } ⇒ String
Set a produce checkpoint.
-
#track_kafka_consume(topic, partition, offset, now) ⇒ Boolean
Track Kafka message consumption for consumer lag monitoring.
-
#track_kafka_produce(topic, partition, offset, now) ⇒ Boolean
Track Kafka produce offset for lag monitoring.
Methods included from Core::Workers::Polling
#enabled=, #enabled?, included, #stop
Constructor Details
#initialize(interval:, logger:, settings:, agent_settings:, agent_info:, buffer_size: DEFAULT_BUFFER_SIZE) ⇒ Processor
Initialize the Data Streams Monitoring processor
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/datadog/data_streams/processor.rb', line 43 def initialize(interval:, logger:, settings:, agent_settings:, agent_info:, buffer_size: DEFAULT_BUFFER_SIZE) raise UnsupportedError, 'DDSketch is not supported' unless Datadog::Core::DDSketch.supported? @settings = settings @agent_settings = agent_settings @agent_info = agent_info @logger = logger now = Core::Utils::Time.now @pathway_context = PathwayContext.new( hash_value: 0, pathway_start: now, current_edge_start: now ) @bucket_size_ns = (interval * 1e9).to_i @buckets = {} @consumer_stats = [] @stats_mutex = Mutex.new @event_buffer = Core::Buffer::CRuby.new(buffer_size) super() self.loop_base_interval = interval perform end |
Instance Attribute Details
#bucket_size_ns ⇒ Object (readonly)
Returns the value of attribute bucket_size_ns.
31 32 33 |
# File 'lib/datadog/data_streams/processor.rb', line 31 def bucket_size_ns @bucket_size_ns end |
#buckets ⇒ Object (readonly)
Returns the value of attribute buckets.
31 32 33 |
# File 'lib/datadog/data_streams/processor.rb', line 31 def buckets @buckets end |
#pathway_context ⇒ Object (readonly)
Returns the value of attribute pathway_context.
31 32 33 |
# File 'lib/datadog/data_streams/processor.rb', line 31 def pathway_context @pathway_context end |
Instance Method Details
#perform ⇒ Object
Called periodically by the worker to flush stats to the agent
152 153 154 155 156 |
# File 'lib/datadog/data_streams/processor.rb', line 152 def perform process_events flush_stats true end |
#set_consume_checkpoint(type:, source:, manual_checkpoint: true, tags: {}) {|key| ... } ⇒ String
Set a consume checkpoint
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/datadog/data_streams/processor.rb', line 134 def set_consume_checkpoint(type:, source:, manual_checkpoint: true, tags: {}, &block) if block pathway_ctx = yield(PROPAGATION_KEY) if pathway_ctx decoded_ctx = decode_pathway_b64(pathway_ctx) set_pathway_context(decoded_ctx) end end = ["type:#{type}", "topic:#{source}", 'direction:in'] << 'manual_checkpoint:true' if manual_checkpoint .concat(.map { |k, v| "#{k}:#{v}" }) unless .empty? span = Datadog::Tracing.active_span set_checkpoint(tags: , span: span) end |
#set_produce_checkpoint(type:, destination:, manual_checkpoint: true, tags: {}) {|key, value| ... } ⇒ String
Set a produce checkpoint
114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/datadog/data_streams/processor.rb', line 114 def set_produce_checkpoint(type:, destination:, manual_checkpoint: true, tags: {}, &block) = ["type:#{type}", "topic:#{destination}", 'direction:out'] << 'manual_checkpoint:true' if manual_checkpoint .concat(.map { |k, v| "#{k}:#{v}" }) unless .empty? span = Datadog::Tracing.active_span pathway = set_checkpoint(tags: , span: span) yield(PROPAGATION_KEY, pathway) if pathway && block pathway end |
#track_kafka_consume(topic, partition, offset, now) ⇒ Boolean
Track Kafka message consumption for consumer lag monitoring
94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/datadog/data_streams/processor.rb', line 94 def track_kafka_consume(topic, partition, offset, now) @event_buffer.push( { type: :kafka_consume, topic: topic, partition: partition, offset: offset, timestamp: now } ) true end |
#track_kafka_produce(topic, partition, offset, now) ⇒ Boolean
Track Kafka produce offset for lag monitoring
75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/datadog/data_streams/processor.rb', line 75 def track_kafka_produce(topic, partition, offset, now) @event_buffer.push( { type: :kafka_produce, topic: topic, partition: partition, offset: offset, timestamp_ns: (now.to_f * 1e9).to_i } ) true end |