Module: Panoptimon::CollectorSink

Defined in:
lib/panoptimon/collector.rb

Instance Method Summary collapse

Instance Method Details

#initialize(handler) ⇒ Object



81
82
83
84
85
86
# File 'lib/panoptimon/collector.rb', line 81

def initialize (handler)
  @handler = handler
  @timeout = @handler.config[:timeout]
  @interval = @handler.config[:interval]
  timer_on
end

#on_unbind(&block) ⇒ Object



126
# File 'lib/panoptimon/collector.rb', line 126

def on_unbind (&block); @on_unbind = block; end

#receive_data(data) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/panoptimon/collector.rb', line 104

def receive_data (data)
  timer_on
  @handler.logger.debug "incoming"
  @buf ||= BufferedTokenizer.new("\n")
  @buf.extract(data).each do |line|
    timer_on(with_interval: true)
    begin
      data = JSON.parse(line)
    rescue
      # TODO feed errors up to the monitor
      $stderr.puts "error parsing #{line.dump} - #{$!}"
    end
    @handler.logger.debug "line: #{line}"
    @handler.bus.notify(Metric.new(@handler.name, data))
  end
end

#receive_stderr(mess) ⇒ Object



121
122
123
124
# File 'lib/panoptimon/collector.rb', line 121

def receive_stderr (mess)
  @handler.noise(mess)
  (@err_mess ||= '') << mess
end

#timer_offObject



100
101
102
# File 'lib/panoptimon/collector.rb', line 100

def timer_off
  @timer.cancel
end

#timer_on(opts = {}) ⇒ Object

reset / start timeout timer



89
90
91
92
93
94
95
96
97
98
# File 'lib/panoptimon/collector.rb', line 89

def timer_on (opts={})
  @timer.cancel unless @timer.nil?
  length = @timeout + (opts[:with_interval] ? @interval : 0)
  @timer = EventMachine::Timer.new(length) {
    scrap = @buf ? " - #{@buf.flush}" : ''
    @handler.logger.error "timeout on #{@handler.name}" + scrap
    @handler.logger.debug {"pid #{get_pid}"}
    close_connection()
  }
end

#unbindObject



127
128
129
130
# File 'lib/panoptimon/collector.rb', line 127

def unbind
  timer_off
  @on_unbind.call(get_status.exitstatus, @err_mess)
end