Class: Aws::Embedded::Metrics::Sinks::Tcp

Inherits:
Object
  • Object
show all
Defined in:
lib/aws-embedded-metrics-customink/sinks/tcp.rb

Overview

Create a sink that will communicate to a CloudWatch Log Agent over a TCP connection.

See docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Generation_CloudWatch_Agent.html for configuration information

Constant Summary collapse

AWS_EMF_AGENT_ENDPOINT_ENV_VAR =
'AWS_EMF_AGENT_ENDPOINT'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(conn_str: ENV.fetch(AWS_EMF_AGENT_ENDPOINT_ENV_VAR, nil), conn_timeout_secs: 10, write_timeout_secs: 10, logger: nil) ⇒ Tcp

Create a new TCP sink. It will use the AWS_EMF_AGENT_ENDPOINT environment variable by default to connect to a CloudWatch Metric Agent.

Parameters:

  • conn_str (String) (defaults to: ENV.fetch(AWS_EMF_AGENT_ENDPOINT_ENV_VAR, nil))

    A connection string, formatted like ‘tcp://127.0.0.1:25888’.

  • conn_timeout_secs (Numeric) (defaults to: 10)

    The number of seconds before timing out the connection to the agent.

  • write_timeout_secs (Numeric) (defaults to: 10)

    The number of seconds to wait before timing out a write.

  • logger (Logger) (defaults to: nil)

    A standard Ruby logger to propagate warnings and errors. Suggested to use Rails.logger.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 29

def initialize(conn_str: ENV.fetch(AWS_EMF_AGENT_ENDPOINT_ENV_VAR, nil),
               conn_timeout_secs: 10,
               write_timeout_secs: 10,
               logger: nil)
  if conn_str.nil?
    raise Sinks::Error, "Must specify a connection string or set environment variable #{AWS_EMF_AGENT_ENDPOINT_ENV_VAR}"
  end

  @logger = logger
  @cw_agent_uri = URI.parse(conn_str)
  if @cw_agent_uri.scheme != 'tcp' || !@cw_agent_uri.host || !@cw_agent_uri.port
    raise Sinks::Error, "Expected connection string to be in format tcp://<host>:<port>, got '#{conn_str}'"
  end

  @client_opts = TCPClient::Configuration.create(
    buffered: true,
    keep_alive: true,
    reverse_lookup: true,
    connect_timeout: conn_timeout_secs,
    write_timeout: write_timeout_secs
  )
  @conn = nil
end

Instance Attribute Details

#client_optsObject (readonly)

Returns the value of attribute client_opts.



18
19
20
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 18

def client_opts
  @client_opts
end

#queueObject (readonly)

Returns the value of attribute queue.



17
18
19
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 17

def queue
  @queue
end

Instance Method Details

#accept(message) ⇒ Object



86
87
88
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 86

def accept(message)
  send_message("#{JSON.dump(message)}\n")
end

#connectionObject



65
66
67
68
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 65

def connection
  @conn = create_conn(@cw_agent_uri.host, @cw_agent_uri.port, @client_opts) if @conn.nil? || @conn.closed?
  @conn
end

#create_conn(host, port, opts) ⇒ Object



61
62
63
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 61

def create_conn(host, port, opts)
  TCPClient.open("#{host}:#{port}", opts)
end

#log_err(msg) ⇒ Object



57
58
59
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 57

def log_err(msg)
  @logger&.error(msg)
end

#log_warn(msg) ⇒ Object



53
54
55
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 53

def log_warn(msg)
  @logger&.warn(msg)
end

#send_message(message) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 70

def send_message(message)
  retries = 2
  conn = nil
  begin
    conn = connection
    conn.write(message)
  rescue Errno::ECONNREFUSED
    conn.close unless conn.nil? || conn.closed?
    log_warn("Could not connect to CloudWatch Agent at #{@cw_agent_uri.scheme}://#{@cw_agent_uri.host}:#{@cw_agent_uri.port}")
    retries -= 1
    retry if retries >= 0
  rescue StandardError => e
    log_err("#{e.class}: #{e.message}: #{e.backtrace.join("\n")}")
  end
end