Class: Fluent::VerticaQueryInput
- Inherits:
-
Input
- Object
- Input
- Fluent::VerticaQueryInput
- Defined in:
- lib/fluent/plugin/in_vertica_query.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #get_connection ⇒ Object
- #get_exec_result ⇒ Object
-
#initialize ⇒ VerticaQueryInput
constructor
A new instance of VerticaQueryInput.
- #query(query) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ VerticaQueryInput
Returns a new instance of VerticaQueryInput.
5 6 7 8 9 |
# File 'lib/fluent/plugin/in_vertica_query.rb', line 5 def initialize require 'vertica' require 'socket' super end |
Instance Method Details
#configure(conf) ⇒ Object
29 30 31 32 33 |
# File 'lib/fluent/plugin/in_vertica_query.rb', line 29 def configure(conf) super @hostname = nil $log.info "adding vertica_query job: [#{@query}] interval: #{@interval}sec" end |
#get_connection ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/fluent/plugin/in_vertica_query.rb', line 62 def get_connection begin return Vertica.connect({ :host => @host, :port => @port, :username => @username, :password => @password, :database => @database, :encoding => @encoding, :reconnect => true }) rescue Exception => e $log.warn "vertica_query: #{e}" sleep @interval retry end end |
#get_exec_result ⇒ Object
91 92 93 94 95 96 97 98 |
# File 'lib/fluent/plugin/in_vertica_query.rb', line 91 def get_exec_result result = Array.new stmt = query(@query) stmt.each do |row| result.push(row) end return result end |
#query(query) ⇒ Object
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/fluent/plugin/in_vertica_query.rb', line 80 def query(query) @vertica ||= get_connection begin return @vertica.query(query) rescue Exception => e $log.warn "vertica_query: #{e}" sleep @interval retry end end |
#run ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/fluent/plugin/in_vertica_query.rb', line 43 def run loop do tag = "#{@tag}".gsub('__HOSTNAME__', @host).gsub('${hostname}', @host) record = Hash.new record.store('host', @host) if @record_host result = get_exec_result record.store(@row_count_key, result.size) if @row_count if (@nest_result) record.store(@nest_key, result) Engine.emit(tag, Engine.now, record) else result.each do |data| Engine.emit(tag, Engine.now, record.merge(data)) end end sleep @interval end end |
#shutdown ⇒ Object
39 40 41 |
# File 'lib/fluent/plugin/in_vertica_query.rb', line 39 def shutdown Thread.kill(@thread) end |
#start ⇒ Object
35 36 37 |
# File 'lib/fluent/plugin/in_vertica_query.rb', line 35 def start @thread = Thread.new(&method(:run)) end |