Class: NetAtlas::Poller
- Inherits:
-
Resource::Base
- Object
- ActiveResource::Base
- Resource::Base
- NetAtlas::Poller
- Defined in:
- lib/netatlas/poller.rb
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
- #collector_queue ⇒ Object
- #configure ⇒ Object
- #do_scheduler ⇒ Object
-
#initialize(*args) ⇒ Poller
constructor
A new instance of Poller.
- #next_poll ⇒ Object
- #poll(ds) ⇒ Object
- #run ⇒ Object
- #schedule(ds) ⇒ Object
Constructor Details
#initialize(*args) ⇒ Poller
Returns a new instance of Poller.
7 8 9 10 11 12 |
# File 'lib/netatlas/poller.rb', line 7 def initialize(*args) @data_sources = {} @queue = EventMachine::Queue.new @queued = {} super end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
6 7 8 |
# File 'lib/netatlas/poller.rb', line 6 def queue @queue end |
Instance Method Details
#collector_queue ⇒ Object
14 15 16 |
# File 'lib/netatlas/poller.rb', line 14 def collector_queue @collector_queue ||= MQ.queue("collector") end |
#configure ⇒ Object
19 20 21 22 23 |
# File 'lib/netatlas/poller.rb', line 19 def configure puts "getting data sources" data_sources puts "got data sources" end |
#do_scheduler ⇒ Object
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/netatlas/poller.rb', line 38 def do_scheduler data_sources.each do |ds| puts "scheduling #{ds}" last_polled = ds.last_result. rescue Time.at(0) interval = ds.step_interval || 60 if (last_polled + interval) < Time.now schedule(ds) end end end |
#next_poll ⇒ Object
57 58 59 60 61 |
# File 'lib/netatlas/poller.rb', line 57 def next_poll @queue.pop do |ds| poll(ds) end end |
#poll(ds) ⇒ Object
63 64 65 66 67 68 69 70 71 72 |
# File 'lib/netatlas/poller.rb', line 63 def poll(ds) puts "in poll" plugin = ds.get_plugin plugin.poll(ds) do |status, value| result = NetAtlas::Result.new(:timestamp => Time.now, :data_source => ds, :status => status, :value => value) puts "got result #{result.inspect}" collector_queue.publish(result.to_json) puts "done poll" end end |
#run ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/netatlas/poller.rb', line 26 def run EM.synchrony do EventMachine.add_periodic_timer(1) { do_scheduler } loop do next_poll end end end |
#schedule(ds) ⇒ Object
49 50 51 52 53 54 |
# File 'lib/netatlas/poller.rb', line 49 def schedule(ds) if !@queued[ds.id] @queue.push(ds) @queued[ds.id] = true end end |