Class: Solanum

Inherits:
Object
  • Object
show all
Defined in:
lib/solanum.rb,
lib/solanum/config.rb,
lib/solanum/source.rb,
lib/solanum/schedule.rb,
lib/solanum/output/print.rb,
lib/solanum/output/riemann.rb

Overview

Schedule management class.

Defined Under Namespace

Modules: Config Classes: Output, Schedule, Source

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config_paths) ⇒ Solanum

Loads the given configuration file(s) and initializes the system.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/solanum.rb', line 38

def initialize(config_paths)
  @defaults = {tags: []}
  @sources = []
  @outputs = []

  # Load and merge files.
  config_paths.each do |path|
    conf = Config.load_file(path)

    # merge defaults, update tags
    @defaults = Solanum.merge_attrs(@defaults, conf[:defaults])

    # sources and outputs are additive
    @sources.concat(conf[:sources])
    @outputs.concat(conf[:outputs])
  end

  # Add default print output.
  if @outputs.empty?
    require 'solanum/output/print'
    @outputs << Solanum::Output::Print.new()
  end

  @defaults.freeze
  @outputs.freeze
  @sources.freeze

  @schedule = Solanum::Schedule.new
  @sources.each_with_index do |source, i|
    @schedule.insert!(source.next_run, i)
  end
end

Instance Attribute Details

#defaultsObject (readonly)

Returns the value of attribute defaults.



8
9
10
# File 'lib/solanum.rb', line 8

def defaults
  @defaults
end

#outputsObject (readonly)

Returns the value of attribute outputs.



8
9
10
# File 'lib/solanum.rb', line 8

def outputs
  @outputs
end

#sourcesObject (readonly)

Returns the value of attribute sources.



8
9
10
# File 'lib/solanum.rb', line 8

def sources
  @sources
end

Class Method Details

.merge_attrs(a, b) ⇒ Object

Merge two event attribute maps together, concatenating tags.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/solanum.rb', line 11

def self.merge_attrs(a, b)
  stringify = lambda do |x|
    o = {}
    x.keys.each do |k|
      o[k.to_s] = x[k]
    end
    o
  end

  if a.nil?
    stringify[b]
  elsif b.nil?
    stringify[a]
  else
    a = stringify[a]
    b = stringify[b]
    tags = a['tags'] ? a['tags'].dup : []
    tags.concat(b['tags']) if b['tags']
    tags.uniq!
    x = a.dup.merge(b)
    x['tags'] = tags unless tags.empty?
    x
  end
end

Instance Method Details

#collect_events!(source) ⇒ Object

Run collection from the given source in a new thread.



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/solanum.rb', line 97

def collect_events!(source)
  Thread.new do
    begin
      events = source.collect!
      attrs = Solanum.merge_attrs(@defaults, source.attributes)
      events = events.map do |event|
        Solanum.merge_attrs(attrs, event)
      end
      record! events
    rescue => e
      STDERR.puts "Error collecting events from source #{source.type}: #{e}"
    end
    reschedule! source
  end
end

#record!(events) ⇒ Object

Report a batch of events to all reporters.



88
89
90
91
92
93
# File 'lib/solanum.rb', line 88

def record!(events)
  # TODO: does this need locking?
  @outputs.each do |output|
    output.write_events events
  end
end

#reschedule!(source) ⇒ Object

Reschedule the given source for later running.



73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/solanum.rb', line 73

def reschedule!(source)
  idx = nil
  @sources.each_with_index do |s, i|
    if s == source
      idx = i
      break
    end
  end
  raise "Source #{source.inspect} is not present in source list!" unless idx
  @schedule.insert!(source.next_run, idx)
  @scheduler.wakeup
end

#run!Object

Runs the collection loop.



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/solanum.rb', line 115

def run!
  @scheduler = Thread.current
  loop do
    # Determine when next scheduled source should run, and sleep if needed.
    duration = @schedule.next_wait || 1
    if 0 < duration
      sleep duration
      next
    end

    # Get the next ready source.
    idx = @schedule.pop_ready!
    source = @sources[idx] if idx
    next unless source
    #puts "Source #{source.type} is ready to run!" # DEBUG

    # Start thread to collect and report events.
    collect_events! source
  end
end