Class: Solanum
- Inherits:
-
Object
- Object
- Solanum
- Defined in:
- lib/solanum.rb,
lib/solanum/config.rb,
lib/solanum/source.rb,
lib/solanum/schedule.rb,
lib/solanum/output/print.rb,
lib/solanum/output/riemann.rb
Overview
Schedule management class.
Defined Under Namespace
Modules: Config Classes: Output, Schedule, Source
Instance Attribute Summary collapse
-
#defaults ⇒ Object
readonly
Returns the value of attribute defaults.
-
#outputs ⇒ Object
readonly
Returns the value of attribute outputs.
-
#sources ⇒ Object
readonly
Returns the value of attribute sources.
Class Method Summary collapse
-
.merge_attrs(a, b) ⇒ Object
Merge two event attribute maps together, concatenating tags.
Instance Method Summary collapse
-
#collect_events!(source) ⇒ Object
Run collection from the given source in a new thread.
-
#initialize(config_paths) ⇒ Solanum
constructor
Loads the given configuration file(s) and initializes the system.
-
#record!(events) ⇒ Object
Report a batch of events to all reporters.
-
#reschedule!(source) ⇒ Object
Reschedule the given source for later running.
-
#run! ⇒ Object
Runs the collection loop.
Constructor Details
#initialize(config_paths) ⇒ Solanum
Loads the given configuration file(s) and initializes the system.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/solanum.rb', line 38 def initialize(config_paths) @defaults = {tags: []} @sources = [] @outputs = [] # Load and merge files. config_paths.each do |path| conf = Config.load_file(path) # merge defaults, update tags @defaults = Solanum.merge_attrs(@defaults, conf[:defaults]) # sources and outputs are additive @sources.concat(conf[:sources]) @outputs.concat(conf[:outputs]) end # Add default print output. if @outputs.empty? require 'solanum/output/print' @outputs << Solanum::Output::Print.new() end @defaults.freeze @outputs.freeze @sources.freeze @schedule = Solanum::Schedule.new @sources.each_with_index do |source, i| @schedule.insert!(source.next_run, i) end end |
Instance Attribute Details
#defaults ⇒ Object (readonly)
Returns the value of attribute defaults.
8 9 10 |
# File 'lib/solanum.rb', line 8 def defaults @defaults end |
#outputs ⇒ Object (readonly)
Returns the value of attribute outputs.
8 9 10 |
# File 'lib/solanum.rb', line 8 def outputs @outputs end |
#sources ⇒ Object (readonly)
Returns the value of attribute sources.
8 9 10 |
# File 'lib/solanum.rb', line 8 def sources @sources end |
Class Method Details
.merge_attrs(a, b) ⇒ Object
Merge two event attribute maps together, concatenating tags.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/solanum.rb', line 11 def self.merge_attrs(a, b) stringify = lambda do |x| o = {} x.keys.each do |k| o[k.to_s] = x[k] end o end if a.nil? stringify[b] elsif b.nil? stringify[a] else a = stringify[a] b = stringify[b] = a['tags'] ? a['tags'].dup : [] .concat(b['tags']) if b['tags'] .uniq! x = a.dup.merge(b) x['tags'] = unless .empty? x end end |
Instance Method Details
#collect_events!(source) ⇒ Object
Run collection from the given source in a new thread.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/solanum.rb', line 97 def collect_events!(source) Thread.new do begin events = source.collect! attrs = Solanum.merge_attrs(@defaults, source.attributes) events = events.map do |event| Solanum.merge_attrs(attrs, event) end record! events rescue => e STDERR.puts "Error collecting events from source #{source.type}: #{e}" end reschedule! source end end |
#record!(events) ⇒ Object
Report a batch of events to all reporters.
88 89 90 91 92 93 |
# File 'lib/solanum.rb', line 88 def record!(events) # TODO: does this need locking? @outputs.each do |output| output.write_events events end end |
#reschedule!(source) ⇒ Object
Reschedule the given source for later running.
73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/solanum.rb', line 73 def reschedule!(source) idx = nil @sources.each_with_index do |s, i| if s == source idx = i break end end raise "Source #{source.inspect} is not present in source list!" unless idx @schedule.insert!(source.next_run, idx) @scheduler.wakeup end |
#run! ⇒ Object
Runs the collection loop.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/solanum.rb', line 115 def run! @scheduler = Thread.current loop do # Determine when next scheduled source should run, and sleep if needed. duration = @schedule.next_wait || 1 if 0 < duration sleep duration next end # Get the next ready source. idx = @schedule.pop_ready! source = @sources[idx] if idx next unless source #puts "Source #{source.type} is ready to run!" # DEBUG # Start thread to collect and report events. collect_events! source end end |