Class: Riemann::Tools::RiemannClientWrapper

Inherits:
Object
  • Object
show all
Defined in:
lib/riemann/tools/riemann_client_wrapper.rb

Constant Summary collapse

BACKOFF_TMIN =

Minimum delay between reconnection attempts

0.5
BACKOFF_TMAX =

Maximum delay

30.0
BACKOFF_FACTOR =
2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ RiemannClientWrapper

Returns a new instance of RiemannClientWrapper.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 16

def initialize(options)
  @options = options

  @queue = Queue.new
  @max_bulk_size = 1000
  @draining = false

  @worker = Thread.new do
    Thread.current.abort_on_exception = true
    backoff_delay = BACKOFF_TMIN

    loop do
      events = []

      events << @queue.pop
      events << @queue.pop while !@queue.empty? && events.size < @max_bulk_size

      client.bulk_send(events)
      backoff_delay = BACKOFF_TMIN
    rescue StandardError => e
      sleep(backoff_delay)

      dropped_count = events.size + @queue.size
      @queue.clear
      warn "Dropped #{dropped_count} event#{'s' if dropped_count > 1} due to #{e}"

      backoff_delay *= BACKOFF_FACTOR
      backoff_delay = BACKOFF_TMAX if backoff_delay > BACKOFF_TMAX
    end
  end

  at_exit { drain }
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



10
11
12
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 10

def options
  @options
end

Instance Method Details

#<<(event) ⇒ Object



71
72
73
74
75
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 71

def <<(event)
  raise('Cannot queue events when draining') if @draining

  @queue << event
end

#clientObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 50

def client
  @client ||= begin
    r = Riemann::Client.new(
      host: options[:host],
      port: options[:port],
      timeout: options[:timeout],
      ssl: options[:tls],
      key_file: options[:tls_key],
      cert_file: options[:tls_cert],
      ca_file: options[:tls_ca_cert],
      ssl_verify: options[:tls_verify],
    )

    if options[:tcp] || options[:tls]
      r.tcp
    else
      r
    end
  end
end

#drainObject



77
78
79
80
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 77

def drain
  @draining = true
  sleep(1) until @queue.empty? || @worker.stop?
end