Class: Datadog::DataStreams::Processor

Inherits:
Core::Worker show all
Includes:
Core::Workers::Polling
Defined in:
lib/datadog/data_streams/processor.rb

Overview

Processor for Data Streams Monitoring This class is responsible for collecting and reporting pathway stats Periodically (every interval, 10 seconds by default) flushes stats to the Datadog agent.

Constant Summary collapse

PROPAGATION_KEY =
'dd-pathway-ctx-base64'
DEFAULT_BUFFER_SIZE =

Default buffer size for lock-free event queue Set to handle high-throughput scenarios (e.g., 10k events/sec for 10s interval)

100_000

Constants included from Core::Workers::Polling

Core::Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT

Instance Attribute Summary collapse

Attributes inherited from Core::Worker

#task

Instance Method Summary collapse

Methods included from Core::Workers::Polling

#enabled=, #enabled?, included, #stop

Constructor Details

#initialize(interval:, logger:, settings:, agent_settings:, agent_info:, buffer_size: DEFAULT_BUFFER_SIZE) ⇒ Processor

Initialize the Data Streams Monitoring processor

Parameters:

Raises:



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/datadog/data_streams/processor.rb', line 43

def initialize(interval:, logger:, settings:, agent_settings:, agent_info:, buffer_size: DEFAULT_BUFFER_SIZE)
  raise UnsupportedError, 'DDSketch is not supported' unless Datadog::Core::DDSketch.supported?

  @settings = settings
  @agent_settings = agent_settings
  @agent_info = agent_info
  @logger = logger

  now = Core::Utils::Time.now
  @pathway_context = PathwayContext.new(
    hash_value: 0,
    pathway_start: now,
    current_edge_start: now
  )
  @bucket_size_ns = (interval * 1e9).to_i
  @buckets = {}
  @consumer_stats = []
  @stats_mutex = Mutex.new
  @event_buffer = Core::Buffer::CRuby.new(buffer_size)

  super()
  self.loop_base_interval = interval

  perform
end

Instance Attribute Details

#bucket_size_nsObject (readonly)

Returns the value of attribute bucket_size_ns.



31
32
33
# File 'lib/datadog/data_streams/processor.rb', line 31

def bucket_size_ns
  @bucket_size_ns
end

#bucketsObject (readonly)

Returns the value of attribute buckets.



31
32
33
# File 'lib/datadog/data_streams/processor.rb', line 31

def buckets
  @buckets
end

#pathway_contextObject (readonly)

Returns the value of attribute pathway_context.



31
32
33
# File 'lib/datadog/data_streams/processor.rb', line 31

def pathway_context
  @pathway_context
end

Instance Method Details

#performObject

Called periodically by the worker to flush stats to the agent



152
153
154
155
156
# File 'lib/datadog/data_streams/processor.rb', line 152

def perform
  process_events
  flush_stats
  true
end

#set_consume_checkpoint(type:, source:, manual_checkpoint: true, tags: {}) {|key| ... } ⇒ String

Set a consume checkpoint

Parameters:

  • type (String)

    The type of the checkpoint (e.g., ‘kafka’, ‘kinesis’, ‘sns’)

  • source (String)

    The source (e.g., topic, exchange, stream name)

  • manual_checkpoint (Boolean) (defaults to: true)

    Whether this checkpoint was manually set (default: true)

  • tags (Hash) (defaults to: {})

    Additional tags to include

Yields:

  • (key)

    Block to extract context from carrier

Returns:

  • (String)

    Base64 encoded pathway context



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/datadog/data_streams/processor.rb', line 134

def set_consume_checkpoint(type:, source:, manual_checkpoint: true, tags: {}, &block)
  if block
    pathway_ctx = yield(PROPAGATION_KEY)
    if pathway_ctx
      decoded_ctx = decode_pathway_b64(pathway_ctx)
      set_pathway_context(decoded_ctx)
    end
  end

  checkpoint_tags = ["type:#{type}", "topic:#{source}", 'direction:in']
  checkpoint_tags << 'manual_checkpoint:true' if manual_checkpoint
  checkpoint_tags.concat(tags.map { |k, v| "#{k}:#{v}" }) unless tags.empty?

  span = Datadog::Tracing.active_span
  set_checkpoint(tags: checkpoint_tags, span: span)
end

#set_produce_checkpoint(type:, destination:, manual_checkpoint: true, tags: {}) {|key, value| ... } ⇒ String

Set a produce checkpoint

Parameters:

  • type (String)

    The type of the checkpoint (e.g., ‘kafka’, ‘kinesis’, ‘sns’)

  • destination (String)

    The destination (e.g., topic, exchange, stream name)

  • manual_checkpoint (Boolean) (defaults to: true)

    Whether this checkpoint was manually set (default: true)

  • tags (Hash) (defaults to: {})

    Additional tags to include

Yields:

  • (key, value)

    Block to inject context into carrier

Returns:

  • (String)

    Base64 encoded pathway context



114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/datadog/data_streams/processor.rb', line 114

def set_produce_checkpoint(type:, destination:, manual_checkpoint: true, tags: {}, &block)
  checkpoint_tags = ["type:#{type}", "topic:#{destination}", 'direction:out']
  checkpoint_tags << 'manual_checkpoint:true' if manual_checkpoint
  checkpoint_tags.concat(tags.map { |k, v| "#{k}:#{v}" }) unless tags.empty?

  span = Datadog::Tracing.active_span
  pathway = set_checkpoint(tags: checkpoint_tags, span: span)

  yield(PROPAGATION_KEY, pathway) if pathway && block

  pathway
end

#track_kafka_consume(topic, partition, offset, now) ⇒ Boolean

Track Kafka message consumption for consumer lag monitoring

Parameters:

  • topic (String)

    The Kafka topic name

  • partition (Integer)

    The partition number

  • offset (Integer)

    The offset of the consumed message

  • now (Time)

    Timestamp

Returns:

  • (Boolean)

    true if tracking succeeded



94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/datadog/data_streams/processor.rb', line 94

def track_kafka_consume(topic, partition, offset, now)
  @event_buffer.push(
    {
      type: :kafka_consume,
      topic: topic,
      partition: partition,
      offset: offset,
      timestamp: now
    }
  )
  true
end

#track_kafka_produce(topic, partition, offset, now) ⇒ Boolean

Track Kafka produce offset for lag monitoring

Parameters:

  • topic (String)

    The Kafka topic name

  • partition (Integer)

    The partition number

  • offset (Integer)

    The offset of the produced message

  • now (Time)

    Timestamp

Returns:

  • (Boolean)

    true if tracking succeeded



75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/datadog/data_streams/processor.rb', line 75

def track_kafka_produce(topic, partition, offset, now)
  @event_buffer.push(
    {
      type: :kafka_produce,
      topic: topic,
      partition: partition,
      offset: offset,
      timestamp_ns: (now.to_f * 1e9).to_i
    }
  )
  true
end