Class: Fluent::DynamodbAltOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_dynamodb_alt.rb

Instance Method Summary collapse

Constructor Details

#initializeDynamodbAltOutput

Returns a new instance of DynamodbAltOutput.



27
28
29
30
31
32
33
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 27

def initialize
  super
  require 'aws-sdk-core'
  require 'parallel'
  require 'set'
  require 'stringio'
end

Instance Method Details

#configure(conf) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 35

def configure(conf)
  super

  aws_opts = {}

  if @profile
    credentials_opts = {:profile_name => @profile}
    credentials_opts[:path] = @credentials_path if @credentials_path
    credentials = Aws::SharedCredentials.new(credentials_opts)
    aws_opts[:credentials] = credentials
  end

  aws_opts[:access_key_id] = @aws_key_id if @aws_key_id
  aws_opts[:secret_access_key] = @aws_sec_key if @aws_sec_key
  aws_opts[:region] = @region if @region
  aws_opts[:endpoint] = @endpoint if @endpoint

  configure_aws(aws_opts)

  client = create_client
  table = client.describe_table(:table_name => @table_name)

  table.table.key_schema.each do |attribute|
    case attribute.key_type
    when 'HASH'
      @hash_key = attribute.attribute_name
    when 'RANGE'
      @range_key = attribute.attribute_name
    else
      raise 'must not happen'
    end
  end

  if @expected
    @expected = parse_expected(@expected)
    log.info("dynamodb_alt expected: #{@expected.inspect}")
  end

  if @binary_keys
    @binary_keys = @binary_keys.strip.split(/\s*,\s*/)
  else
    @binary_keys = []
  end
end

#format(tag, time, record) ⇒ Object



86
87
88
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 86

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#startObject



80
81
82
83
84
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 80

def start
  super

  @client = create_client
end

#write(chunk) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 90

def write(chunk)
  chunk = aggregate_records(chunk)
  block = proc do |tag, time, record|
    put_record(record)
  end

  if @concurrency > 1
    Parallel.each(chunk, :in_threads => @concurrency, &block)
  else
    chunk.each(&block)
  end
end