Class: Datadog::Core::Telemetry::Worker

Inherits:
Object
  • Object
show all
Includes:
Workers::Polling, Workers::Queue
Defined in:
lib/datadog/core/telemetry/worker.rb

Overview

Accumulates events and sends them to the API at a regular interval, including heartbeat event.

Constant Summary collapse

DEFAULT_BUFFER_MAX_SIZE =
1000
APP_STARTED_EVENT_RETRIES =
10
TELEMETRY_STARTED_ONCE =
Utils::OnlyOnceSuccessful.new(APP_STARTED_EVENT_RETRIES)

Constants included from Workers::Polling

Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT

Instance Attribute Summary

Attributes included from Workers::Queue

#buffer

Instance Method Summary collapse

Methods included from Workers::Polling

#enabled=, #enabled?, included

Methods included from Workers::Queue

included

Constructor Details

#initialize(heartbeat_interval_seconds:, emitter:, dependency_collection:, enabled: true, shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT, buffer_size: DEFAULT_BUFFER_MAX_SIZE) ⇒ Worker

Returns a new instance of Worker.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/datadog/core/telemetry/worker.rb', line 22

def initialize(
  heartbeat_interval_seconds:,
  emitter:,
  dependency_collection:,
  enabled: true,
  shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT,
  buffer_size: DEFAULT_BUFFER_MAX_SIZE
)
  @emitter = emitter
  @dependency_collection = dependency_collection

  # Workers::Polling settings
  self.enabled = enabled
  # Workers::IntervalLoop settings
  self.loop_base_interval = heartbeat_interval_seconds
  self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP

  @shutdown_timeout = shutdown_timeout
  @buffer_size = buffer_size

  self.buffer = buffer_klass.new(@buffer_size)
end

Instance Method Details

#enqueue(event) ⇒ Object



58
59
60
61
62
# File 'lib/datadog/core/telemetry/worker.rb', line 58

def enqueue(event)
  return if !enabled? || forked?

  buffer.push(event)
end

#failed_to_start?Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/datadog/core/telemetry/worker.rb', line 68

def failed_to_start?
  TELEMETRY_STARTED_ONCE.failed?
end

#sent_started_event?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/datadog/core/telemetry/worker.rb', line 64

def sent_started_event?
  TELEMETRY_STARTED_ONCE.success?
end

#startObject



45
46
47
48
49
50
# File 'lib/datadog/core/telemetry/worker.rb', line 45

def start
  return if !enabled? || forked?

  # starts async worker
  perform
end

#stop(force_stop = false, timeout = @shutdown_timeout) ⇒ Object



52
53
54
55
56
# File 'lib/datadog/core/telemetry/worker.rb', line 52

def stop(force_stop = false, timeout = @shutdown_timeout)
  buffer.close if running?

  super
end