Class: Riemann::Tools::RiemannClientWrapper
- Inherits:
-
Object
- Object
- Riemann::Tools::RiemannClientWrapper
- Defined in:
- lib/riemann/tools/riemann_client_wrapper.rb
Constant Summary collapse
- BACKOFF_TMIN =
Minimum delay between reconnection attempts
0.5
- BACKOFF_TMAX =
Maximum delay
30.0
- BACKOFF_FACTOR =
2
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Instance Method Summary collapse
- #<<(event) ⇒ Object
- #client ⇒ Object
- #drain ⇒ Object
-
#initialize(options) ⇒ RiemannClientWrapper
constructor
A new instance of RiemannClientWrapper.
Constructor Details
#initialize(options) ⇒ RiemannClientWrapper
Returns a new instance of RiemannClientWrapper.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 16 def initialize() @options = @queue = Queue.new @max_bulk_size = 1000 @draining = false @worker = Thread.new do Thread.current.abort_on_exception = true backoff_delay = BACKOFF_TMIN loop do events = [] events << @queue.pop events << @queue.pop while !@queue.empty? && events.size < @max_bulk_size client.bulk_send(events) backoff_delay = BACKOFF_TMIN rescue StandardError => e sleep(backoff_delay) dropped_count = events.size + @queue.size @queue.clear warn "Dropped #{dropped_count} event#{'s' if dropped_count > 1} due to #{e}" backoff_delay *= BACKOFF_FACTOR backoff_delay = BACKOFF_TMAX if backoff_delay > BACKOFF_TMAX end end at_exit { drain } end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
10 11 12 |
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 10 def @options end |
Instance Method Details
#<<(event) ⇒ Object
71 72 73 74 75 |
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 71 def <<(event) raise('Cannot queue events when draining') if @draining @queue << event end |
#client ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 50 def client @client ||= begin r = Riemann::Client.new( host: [:host], port: [:port], timeout: [:timeout], ssl: [:tls], key_file: [:tls_key], cert_file: [:tls_cert], ca_file: [:tls_ca_cert], ssl_verify: [:tls_verify], ) if [:tcp] || [:tls] r.tcp else r end end end |
#drain ⇒ Object
77 78 79 80 |
# File 'lib/riemann/tools/riemann_client_wrapper.rb', line 77 def drain @draining = true sleep(1) until @queue.empty? || @worker.stop? end |