Class: Franz::Agg
- Inherits:
-
Object
- Object
- Franz::Agg
- Defined in:
- lib/franz/agg.rb
Overview
Agg mostly aggregates Tail events by applying the multiline filter, but it also applies the “host” and “type” fields. Basically, it does all the post- processing after we’ve retreived a line from a file.
Constant Summary collapse
- @@host =
We’ll apply the hostname to all events
Socket.gethostname
Instance Attribute Summary collapse
-
#seqs ⇒ Object
readonly
Returns the value of attribute seqs.
Instance Method Summary collapse
-
#initialize(opts = {}) ⇒ Agg
constructor
Start a new Agg thread in the background.
-
#state ⇒ Object
Return the internal “seqs” state.
-
#stop ⇒ Hash
Stop the Agg thread.
Constructor Details
#initialize(opts = {}) ⇒ Agg
Start a new Agg thread in the background.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/franz/agg.rb', line 29 def initialize opts={} @ic = opts[:input_config] || raise('No input_config specified') @tail_events = opts[:tail_events] || [] @agg_events = opts[:agg_events] || [] @buffer_limit = opts[:buffer_limit] || 50 @flush_interval = opts[:flush_interval] || 10 @seqs = opts[:seqs] || Hash.new @logger = opts[:logger] || Logger.new(STDOUT) @lock = Hash.new { |h,k| h[k] = Mutex.new } @buffer = Franz::Sash.new @stop = false @statz = opts[:statz] || Franz::Stats.new @statz.create :num_lines, 0 @t1 = Thread.new do until @stop flush sleep flush_interval end flush true end @t2 = Thread.new do capture until @stop end log.debug \ event: 'agg started', tail_events: @tail_events, agg_events: @agg_events end |
Instance Attribute Details
#seqs ⇒ Object (readonly)
Returns the value of attribute seqs.
18 19 20 |
# File 'lib/franz/agg.rb', line 18 def seqs @seqs end |
Instance Method Details
#state ⇒ Object
Return the internal “seqs” state
78 79 80 |
# File 'lib/franz/agg.rb', line 78 def state return @seqs.dup end |
#stop ⇒ Hash
Stop the Agg thread. Effectively only once.
68 69 70 71 72 73 74 75 |
# File 'lib/franz/agg.rb', line 68 def stop return state if @stop @stop = true @t2.kill @t1.join log.debug event: 'agg stopped' return state end |