Class: Aws::Embedded::Metrics::Sinks::Tcp
- Inherits:
-
Object
- Object
- Aws::Embedded::Metrics::Sinks::Tcp
- Defined in:
- lib/aws-embedded-metrics-customink/sinks/tcp.rb
Overview
Create a sink that will communicate to a CloudWatch Log Agent over a TCP connection.
See docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Generation_CloudWatch_Agent.html for configuration information
Constant Summary collapse
- AWS_EMF_AGENT_ENDPOINT_ENV_VAR =
'AWS_EMF_AGENT_ENDPOINT'
Instance Attribute Summary collapse
-
#client_opts ⇒ Object
readonly
Returns the value of attribute client_opts.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
- #accept(message) ⇒ Object
- #connection ⇒ Object
- #create_conn(host, port, opts) ⇒ Object
-
#initialize(conn_str: ENV.fetch(AWS_EMF_AGENT_ENDPOINT_ENV_VAR, nil), conn_timeout_secs: 10, write_timeout_secs: 10, logger: nil) ⇒ Tcp
constructor
Create a new TCP sink.
- #log_err(msg) ⇒ Object
- #log_warn(msg) ⇒ Object
- #send_message(message) ⇒ Object
Constructor Details
#initialize(conn_str: ENV.fetch(AWS_EMF_AGENT_ENDPOINT_ENV_VAR, nil), conn_timeout_secs: 10, write_timeout_secs: 10, logger: nil) ⇒ Tcp
Create a new TCP sink. It will use the AWS_EMF_AGENT_ENDPOINT
environment variable by default to connect to a CloudWatch Metric Agent.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 29 def initialize(conn_str: ENV.fetch(AWS_EMF_AGENT_ENDPOINT_ENV_VAR, nil), conn_timeout_secs: 10, write_timeout_secs: 10, logger: nil) if conn_str.nil? raise Sinks::Error, "Must specify a connection string or set environment variable #{AWS_EMF_AGENT_ENDPOINT_ENV_VAR}" end @logger = logger @cw_agent_uri = URI.parse(conn_str) if @cw_agent_uri.scheme != 'tcp' || !@cw_agent_uri.host || !@cw_agent_uri.port raise Sinks::Error, "Expected connection string to be in format tcp://<host>:<port>, got '#{conn_str}'" end @client_opts = TCPClient::Configuration.create( buffered: true, keep_alive: true, reverse_lookup: true, connect_timeout: conn_timeout_secs, write_timeout: write_timeout_secs ) @conn = nil end |
Instance Attribute Details
#client_opts ⇒ Object (readonly)
Returns the value of attribute client_opts.
18 19 20 |
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 18 def client_opts @client_opts end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
17 18 19 |
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 17 def queue @queue end |
Instance Method Details
#accept(message) ⇒ Object
86 87 88 |
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 86 def accept() ("#{JSON.dump()}\n") end |
#connection ⇒ Object
65 66 67 68 |
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 65 def connection @conn = create_conn(@cw_agent_uri.host, @cw_agent_uri.port, @client_opts) if @conn.nil? || @conn.closed? @conn end |
#create_conn(host, port, opts) ⇒ Object
61 62 63 |
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 61 def create_conn(host, port, opts) TCPClient.open("#{host}:#{port}", opts) end |
#log_err(msg) ⇒ Object
57 58 59 |
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 57 def log_err(msg) @logger&.error(msg) end |
#log_warn(msg) ⇒ Object
53 54 55 |
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 53 def log_warn(msg) @logger&.warn(msg) end |
#send_message(message) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/aws-embedded-metrics-customink/sinks/tcp.rb', line 70 def () retries = 2 conn = nil begin conn = connection conn.write() rescue Errno::ECONNREFUSED conn.close unless conn.nil? || conn.closed? log_warn("Could not connect to CloudWatch Agent at #{@cw_agent_uri.scheme}://#{@cw_agent_uri.host}:#{@cw_agent_uri.port}") retries -= 1 retry if retries >= 0 rescue StandardError => e log_err("#{e.class}: #{e.}: #{e.backtrace.join("\n")}") end end |