Class: Datadog::Core::Telemetry::Worker

Inherits:
Object
  • Object
show all
Includes:
Workers::Polling, Workers::Queue
Defined in:
lib/datadog/core/telemetry/worker.rb

Overview

Accumulates events and sends them to the API at a regular interval, including heartbeat event.

Constant Summary collapse

DEFAULT_BUFFER_MAX_SIZE =
1000
APP_STARTED_EVENT_RETRIES =
10
TELEMETRY_STARTED_ONCE =
Utils::OnlyOnceSuccessful.new(APP_STARTED_EVENT_RETRIES)

Constants included from Workers::Polling

Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT

Instance Attribute Summary

Attributes included from Workers::Queue

#buffer

Instance Method Summary collapse

Methods included from Workers::Polling

#enabled=, #enabled?, included

Methods included from Workers::Queue

included

Constructor Details

#initialize(heartbeat_interval_seconds:, metrics_aggregation_interval_seconds:, emitter:, metrics_manager:, dependency_collection:, enabled: true, shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, buffer_size: DEFAULT_BUFFER_MAX_SIZE) ⇒ Worker

Returns a new instance of Worker.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/datadog/core/telemetry/worker.rb', line 22

def initialize(
  heartbeat_interval_seconds:,
  metrics_aggregation_interval_seconds:,
  emitter:,
  metrics_manager:,
  dependency_collection:,
  enabled: true,
  shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT,
  buffer_size: DEFAULT_BUFFER_MAX_SIZE
)
  @emitter = emitter
  @metrics_manager = metrics_manager
  @dependency_collection = dependency_collection

  @ticks_per_heartbeat = (heartbeat_interval_seconds / metrics_aggregation_interval_seconds).to_i
  @current_ticks = 0

  # Workers::Polling settings
  self.enabled = enabled
  # Workers::IntervalLoop settings
  self.loop_base_interval = metrics_aggregation_interval_seconds
  self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP

  @shutdown_timeout = shutdown_timeout
  @buffer_size = buffer_size

  self.buffer = buffer_klass.new(@buffer_size)
end

Instance Method Details

#enqueue(event) ⇒ Object



64
65
66
67
68
# File 'lib/datadog/core/telemetry/worker.rb', line 64

def enqueue(event)
  return if !enabled? || forked?

  buffer.push(event)
end

#failed_to_start?Boolean

Returns:

  • (Boolean)


74
75
76
# File 'lib/datadog/core/telemetry/worker.rb', line 74

def failed_to_start?
  TELEMETRY_STARTED_ONCE.failed?
end

#sent_started_event?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/datadog/core/telemetry/worker.rb', line 70

def sent_started_event?
  TELEMETRY_STARTED_ONCE.success?
end

#startObject



51
52
53
54
55
56
# File 'lib/datadog/core/telemetry/worker.rb', line 51

def start
  return if !enabled? || forked?

  # starts async worker
  perform
end

#stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object



58
59
60
61
62
# File 'lib/datadog/core/telemetry/worker.rb', line 58

def stop(force_stop = false, timeout = @shutdown_timeout)
  buffer.close if running?

  super
end