Class: NetAtlas::Poller

Inherits:
Resource::Base show all
Defined in:
lib/netatlas/poller.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Poller

Returns a new instance of Poller.



7
8
9
10
11
12
# File 'lib/netatlas/poller.rb', line 7

def initialize(*args)
  @data_sources = {}
  @queue = EventMachine::Queue.new
  @queued = {}
  super
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



6
7
8
# File 'lib/netatlas/poller.rb', line 6

def queue
  @queue
end

Instance Method Details

#collector_queueObject



14
15
16
# File 'lib/netatlas/poller.rb', line 14

def collector_queue
  @collector_queue ||= MQ.queue("collector")
end

#configureObject



19
20
21
22
23
# File 'lib/netatlas/poller.rb', line 19

def configure
  puts "getting data sources"
  data_sources
  puts "got data sources"
end

#do_schedulerObject



38
39
40
41
42
43
44
45
46
47
# File 'lib/netatlas/poller.rb', line 38

def do_scheduler
  data_sources.each do |ds|
    puts "scheduling #{ds}"
    last_polled = ds.last_result.timestamp rescue  Time.at(0)
    interval = ds.step_interval || 60
    if (last_polled + interval) < Time.now
      schedule(ds)
    end
  end
end

#next_pollObject



57
58
59
60
61
# File 'lib/netatlas/poller.rb', line 57

def next_poll
  @queue.pop do |ds|
    poll(ds)
  end
end

#poll(ds) ⇒ Object



63
64
65
66
67
68
69
70
71
72
# File 'lib/netatlas/poller.rb', line 63

def poll(ds)
  puts "in poll"
  plugin = ds.get_plugin
  plugin.poll(ds) do |status, value|
    result = NetAtlas::Result.new(:timestamp => Time.now, :data_source => ds, :status => status, :value => value)
    puts "got result #{result.inspect}"
    collector_queue.publish(result.to_json)
    puts "done poll"
  end
end

#runObject



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/netatlas/poller.rb', line 26

def run
  EM.synchrony do
    EventMachine.add_periodic_timer(1) {
      do_scheduler
    }

    loop do
      next_poll
    end
  end
end

#schedule(ds) ⇒ Object



49
50
51
52
53
54
# File 'lib/netatlas/poller.rb', line 49

def schedule(ds)
  if !@queued[ds.id]
    @queue.push(ds)
    @queued[ds.id] = true
  end
end