20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# File 'lib/stream_reader.rb', line 20
def run!(&block)
loop do
begin
iterator_opts = { stream_name: @stream_name, shard_id: shard_id }
if seq = @tracker.last_sequence_number
iterator_opts[:shard_iterator_type] = 'AFTER_SEQUENCE_NUMBER'
iterator_opts[:starting_sequence_number] = seq
else
iterator_opts[:shard_iterator_type] = 'TRIM_HORIZON'
end
@logger.debug "Getting shard iterator for #{@stream_name} / #{seq}"
resp = client.get_shard_iterator(iterator_opts)
shard_iterator = resp.shard_iterator
loop do
sleep 1
@logger.debug "Getting records for #{shard_iterator}"
resp = client.get_records({
shard_iterator: shard_iterator,
limit: BATCH_SIZE,
})
resp.records.each do |record|
ActiveSupport::Notifications.instrument('stream_reader.process_record',
stream_name: @stream_name,
prefix: @prefix,
shard_id: shard_id,
ms_behind: resp.millis_behind_latest) do
AvroParser.new(record.data).each_with_schema_name(&block)
@tracker.last_sequence_number = record.sequence_number
end
end
shard_iterator = resp.next_shard_iterator
end
rescue Aws::Kinesis::Errors::ExpiredIteratorException
@logger.debug "Iterator expired! Fetching a new one."
end
end
end
|