Class: Fluent::RdsErrorLogInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_rds_error_log.rb

Defined Under Namespace

Classes: TimerWatcher

Constant Summary collapse

LOG_REGEXP =
/^(?<time>\d{4}-\d{2}-\d{2} \d{2}\:\d{2}\:\d{2})( (?<pid>\d+))?( \[(?<message_level>[^\]]*?)\])? (?<message>.*)$/

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/in_rds_error_log.rb', line 28

def configure(conf)
  super
  require 'aws-sdk'

  raise Fluent::ConfigError.new("region is required") unless @region
  if !has_iam_role?
    raise Fluent::ConfigError.new("access_key_id is required") if @access_key_id.nil?
    raise Fluent::ConfigError.new("secret_access_key is required") if @secret_access_key.nil?
  end
  raise Fluent::ConfigError.new("db_instance_identifier is required") unless @db_instance_identifier
  raise Fluent::ConfigError.new("pos_file is required") unless @pos_file
  raise Fluent::ConfigError.new("refresh_interval is required") unless @refresh_interval
  raise Fluent::ConfigError.new("tag is required") unless @tag
end

#shutdownObject



68
69
70
71
72
# File 'lib/fluent/plugin/in_rds_error_log.rb', line 68

def shutdown
  super
  @watcher.terminate
  @thread.join
end

#startObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fluent/plugin/in_rds_error_log.rb', line 43

def start
  super

  # pos file touch
  File.open(@pos_file, File::RDWR|File::CREAT).close

  begin
    options = {
      :region => @region,
    }
    if @access_key_id && @secret_access_key
      options[:access_key_id] = @access_key_id
      options[:secret_access_key] = @secret_access_key
    end
    @rds = Aws::RDS::Client.new(options)
  rescue => e
    $log.warn "RDS Client error occurred: #{e.message}"
  end

  @loop = Coolio::Loop.new
  timer_trigger = TimerWatcher.new(@refresh_interval, true, &method(:input))
  timer_trigger.attach(@loop)
  @thread = Thread.new(&method(:run))
end