Class: Kinesis::Aggregation::Deaggregator

Inherits:
Object
  • Object
show all
Defined in:
lib/kinesis/aggregation/deaggregator.rb

Constant Summary collapse

MAGIC =
"\xf3\x89\x9a\xc2".force_encoding('ASCII-8BIT').freeze
DIGEST_SIZE =
16

Instance Method Summary collapse

Constructor Details

#initialize(raw_record) ⇒ Deaggregator

Returns a new instance of Deaggregator.



11
12
13
# File 'lib/kinesis/aggregation/deaggregator.rb', line 11

def initialize(raw_record)
  @raw_record = raw_record.with_indifferent_access
end

Instance Method Details

#deaggregateObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/kinesis/aggregation/deaggregator.rb', line 15

def deaggregate
  return [kinesis_record] unless aggregated_record? && computed_md5 == kinesis_record_md5

  aggregated_record.records.map do |record|
    base_record.merge(
      kinesis: {
        kinesisSchemaVersion: kinesis_record[:kinesis][:kinesisSchemaVersion],
        sequenceNumber: kinesis_record[:kinesis][:sequenceNumber],
        approximateArrivalTimestamp: kinesis_record[:kinesis][:approximateArrivalTimestamp],
        explicitHashKey: explicit_hash_for_for(record),
        partitionKey: partition_key_for(record),
        data: Base64.encode64(record.data),
        recordId: kinesis_record[:kinesis][:recordId]
      }
    )
  end
end