Class: Datadog::Statsd::Forwarder

Inherits:
Object
  • Object
show all
Defined in:
lib/datadog/statsd/forwarder.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host: nil, port: nil, socket_path: nil, buffer_max_payload_size: nil, buffer_max_pool_size: nil, buffer_overflowing_stategy: :drop, telemetry_flush_interval: nil, global_tags: [], logger: nil) ⇒ Forwarder

Returns a new instance of Forwarder.


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/datadog/statsd/forwarder.rb', line 9

def initialize(
  host: nil,
  port: nil,
  socket_path: nil,

  buffer_max_payload_size: nil,
  buffer_max_pool_size: nil,
  buffer_overflowing_stategy: :drop,

  telemetry_flush_interval: nil,
  global_tags: [],

  logger: nil
)
  @transport_type = socket_path.nil? ? :udp : :uds

  if telemetry_flush_interval
    @telemetry = Telemetry.new(telemetry_flush_interval,
      global_tags: global_tags,
      transport_type: transport_type
    )
  end

  @connection = case transport_type
                when :udp
                  UDPConnection.new(host, port, logger: logger, telemetry: telemetry)
                when :uds
                  UDSConnection.new(socket_path, logger: logger, telemetry: telemetry)
                end

  # Initialize buffer
  buffer_max_payload_size ||= (transport_type == :udp ? UDP_DEFAULT_BUFFER_SIZE : UDS_DEFAULT_BUFFER_SIZE)

  if buffer_max_payload_size <= 0
    raise ArgumentError, 'buffer_max_payload_size cannot be <= 0'
  end

  unless telemetry.nil? || telemetry.would_fit_in?(buffer_max_payload_size)
    raise ArgumentError, "buffer_max_payload_size is not high enough to use telemetry (tags=(#{global_tags.inspect}))"
  end

  @buffer = MessageBuffer.new(@connection,
    max_payload_size: buffer_max_payload_size,
    max_pool_size: buffer_max_pool_size || DEFAULT_BUFFER_POOL_SIZE,
    overflowing_stategy: buffer_overflowing_stategy,
  )

  @sender = Sender.new(buffer)
  @sender.start
end

Instance Attribute Details

#telemetryObject (readonly)

Returns the value of attribute telemetry


6
7
8
# File 'lib/datadog/statsd/forwarder.rb', line 6

def telemetry
  @telemetry
end

#transport_typeObject (readonly)

Returns the value of attribute transport_type


7
8
9
# File 'lib/datadog/statsd/forwarder.rb', line 7

def transport_type
  @transport_type
end

Instance Method Details

#closeObject


94
95
96
97
# File 'lib/datadog/statsd/forwarder.rb', line 94

def close
  sender.stop
  connection.close
end

#flush(flush_telemetry: false, sync: false) ⇒ Object


70
71
72
73
74
# File 'lib/datadog/statsd/forwarder.rb', line 70

def flush(flush_telemetry: false, sync: false)
  do_flush_telemetry if telemetry && flush_telemetry

  sender.flush(sync: sync)
end

#hostObject


76
77
78
79
80
# File 'lib/datadog/statsd/forwarder.rb', line 76

def host
  return nil unless transport_type == :udp

  connection.host
end

#portObject


82
83
84
85
86
# File 'lib/datadog/statsd/forwarder.rb', line 82

def port
  return nil unless transport_type == :udp

  connection.port
end

#send_message(message) ⇒ Object


60
61
62
63
64
# File 'lib/datadog/statsd/forwarder.rb', line 60

def send_message(message)
  sender.add(message)

  tick_telemetry
end

#socket_pathObject


88
89
90
91
92
# File 'lib/datadog/statsd/forwarder.rb', line 88

def socket_path
  return nil unless transport_type == :uds

  connection.socket_path
end

#sync_with_outbound_ioObject


66
67
68
# File 'lib/datadog/statsd/forwarder.rb', line 66

def sync_with_outbound_io
  sender.rendez_vous
end