Class: StatsD::Instrument::Sink

Inherits:
Object
  • Object
show all
Defined in:
lib/statsd/instrument/sink.rb

Constant Summary collapse

FINALIZER =
->(object_id) do
  Thread.list.each do |thread|
    if (store = thread["StatsD::UDPSink"])
      store.delete(object_id)&.close
    end
  end
end

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection = nil) ⇒ Sink

Returns a new instance of Sink.



28
29
30
31
# File 'lib/statsd/instrument/sink.rb', line 28

def initialize(connection = nil)
  ObjectSpace.define_finalizer(self, FINALIZER)
  @connection = connection
end

Class Method Details

.for_addr(addr) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
# File 'lib/statsd/instrument/sink.rb', line 7

def for_addr(addr)
  # if addr is host:port
  if addr.include?(":")
    host, port_as_string = addr.split(":", 2)
    connection = UdpConnection.new(host, Integer(port_as_string))
    new(connection)
  else
    connection = UdsConnection.new(addr)
    new(connection)
  end
end

Instance Method Details

#<<(datagram) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/statsd/instrument/sink.rb', line 37

def <<(datagram)
  retried = false
  begin
    connection.send_datagram(datagram)
  rescue SocketError, IOError, SystemCallError => error
    StatsD.logger.debug do
      "[#{self.class.name}] [#{connection.class.name}] " \
        "Resetting connection because of #{error.class}: #{error.message}"
    end
    invalidate_connection
    if retried
      StatsD.logger.warn do
        "[#{self.class.name}] [#{connection.class.name}] " \
          "Events were dropped (after retrying) because of #{error.class}: #{error.message}. " \
          "Message size: #{datagram.bytesize} bytes."
      end
    else
      retried = true
      retry
    end
  end
  self
end

#connectionObject



69
70
71
# File 'lib/statsd/instrument/sink.rb', line 69

def connection
  thread_store[object_id] ||= @connection
end

#connection_typeObject



65
66
67
# File 'lib/statsd/instrument/sink.rb', line 65

def connection_type
  connection.class.name
end

#flush(blocking: false) ⇒ Object



61
62
63
# File 'lib/statsd/instrument/sink.rb', line 61

def flush(blocking: false)
  # noop
end

#hostObject



73
74
75
# File 'lib/statsd/instrument/sink.rb', line 73

def host
  connection.host
end

#portObject



77
78
79
# File 'lib/statsd/instrument/sink.rb', line 77

def port
  connection.port
end

#sample?(sample_rate) ⇒ Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/statsd/instrument/sink.rb', line 33

def sample?(sample_rate)
  sample_rate == 1.0 || rand < sample_rate
end