Class: Fluent::VerticaQueryInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_vertica_query.rb

Instance Method Summary collapse

Constructor Details

#initializeVerticaQueryInput

Returns a new instance of VerticaQueryInput.



5
6
7
8
9
# File 'lib/fluent/plugin/in_vertica_query.rb', line 5

def initialize
  require 'vertica'
  require 'socket'
  super
end

Instance Method Details

#configure(conf) ⇒ Object



29
30
31
32
33
# File 'lib/fluent/plugin/in_vertica_query.rb', line 29

def configure(conf)
  super
  @hostname = nil
  $log.info "adding vertica_query job: [#{@query}] interval: #{@interval}sec"
end

#get_connectionObject



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/fluent/plugin/in_vertica_query.rb', line 62

def get_connection
  begin
    return Vertica.connect({
      :host => @host,
      :port => @port,
      :username => @username,
      :password => @password,
      :database => @database,
      :encoding => @encoding,
      :reconnect => true
    })
  rescue Exception => e
    $log.warn "vertica_query: #{e}"
    sleep @interval
    retry
  end
end

#get_exec_resultObject



91
92
93
94
95
96
97
98
# File 'lib/fluent/plugin/in_vertica_query.rb', line 91

def get_exec_result
  result = Array.new
  stmt = query(@query)
  stmt.each do |row|
    result.push(row)
  end
  return result
end

#query(query) ⇒ Object



80
81
82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin/in_vertica_query.rb', line 80

def query(query)
  @vertica ||= get_connection
  begin
    return @vertica.query(query)
  rescue Exception => e
    $log.warn "vertica_query: #{e}"
    sleep @interval
    retry
  end
end

#runObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/fluent/plugin/in_vertica_query.rb', line 43

def run
  loop do
    tag = "#{@tag}".gsub('__HOSTNAME__', @host).gsub('${hostname}', @host)
    record = Hash.new
    record.store('host', @host) if @record_host
    result = get_exec_result
    record.store(@row_count_key, result.size) if @row_count
    if (@nest_result)
      record.store(@nest_key, result)
      Engine.emit(tag, Engine.now, record)
    else
      result.each do |data|
        Engine.emit(tag, Engine.now, record.merge(data))
      end
    end
    sleep @interval
  end
end

#shutdownObject



39
40
41
# File 'lib/fluent/plugin/in_vertica_query.rb', line 39

def shutdown
  Thread.kill(@thread)
end

#startObject



35
36
37
# File 'lib/fluent/plugin/in_vertica_query.rb', line 35

def start
  @thread = Thread.new(&method(:run))
end