Class: Kcl::Workers::Consumer
- Inherits:
-
Object
- Object
- Kcl::Workers::Consumer
- Defined in:
- lib/kcl/workers/consumer.rb
Overview
Shard : Consumer = 1 : 1
-
get records from stream
-
send to record processor
-
create record checkpoint
Instance Method Summary collapse
- #consume! ⇒ Object
- #create_initialize_input ⇒ Object
- #create_records_input(records, millis_behind_latest, record_checkpointer) ⇒ Object
- #create_shutdown_input(shutdown_reason, record_checkpointer) ⇒ Object
-
#initialize(shard, record_processor, kinesis_proxy, checkpointer) ⇒ Consumer
constructor
A new instance of Consumer.
- #start_shard_iterator ⇒ Object
Constructor Details
#initialize(shard, record_processor, kinesis_proxy, checkpointer) ⇒ Consumer
Returns a new instance of Consumer.
7 8 9 10 11 12 |
# File 'lib/kcl/workers/consumer.rb', line 7 def initialize(shard, record_processor, kinesis_proxy, checkpointer) @shard = shard @record_processor = record_processor @kinesis = kinesis_proxy @checkpointer = checkpointer end |
Instance Method Details
#consume! ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/kcl/workers/consumer.rb', line 14 def consume! initialize_input = create_initialize_input @record_processor.after_initialize(initialize_input) record_checkpointer = Kcl::Workers::RecordCheckpointer.new(@shard, @checkpointer) shard_iterator = start_shard_iterator loop do result = @kinesis.get_records(shard_iterator) records_input = create_records_input( result[:records], result[:millis_behind_latest], record_checkpointer ) @record_processor.process_records(records_input) shard_iterator = result[:next_shard_iterator] break if result[:records].empty? || shard_iterator.nil? end shutdown_reason = shard_iterator.nil? ? Kcl::Workers::ShutdownReason::TERMINATE : Kcl::Workers::ShutdownReason::REQUESTED shutdown_input = create_shutdown_input(shutdown_reason, record_checkpointer) @record_processor.shutdown(shutdown_input) end |
#create_initialize_input ⇒ Object
58 59 60 61 62 63 |
# File 'lib/kcl/workers/consumer.rb', line 58 def create_initialize_input Kcl::Types::InitializationInput.new( @shard.shard_id, Kcl::Types::ExtendedSequenceNumber.new(@shard.checkpoint) ) end |
#create_records_input(records, millis_behind_latest, record_checkpointer) ⇒ Object
65 66 67 68 69 70 71 |
# File 'lib/kcl/workers/consumer.rb', line 65 def create_records_input(records, millis_behind_latest, record_checkpointer) Kcl::Types::RecordsInput.new( records, millis_behind_latest, record_checkpointer ) end |
#create_shutdown_input(shutdown_reason, record_checkpointer) ⇒ Object
73 74 75 76 77 78 |
# File 'lib/kcl/workers/consumer.rb', line 73 def create_shutdown_input(shutdown_reason, record_checkpointer) Kcl::Types::ShutdownInput.new( shutdown_reason, record_checkpointer ) end |
#start_shard_iterator ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/kcl/workers/consumer.rb', line 42 def start_shard_iterator shard = @checkpointer.fetch_checkpoint(@shard) if shard.checkpoint.nil? return @kinesis.get_shard_iterator( @shard.shard_id, Kcl::Checkpoints::Sentinel::TRIM_HORIZON ) end @kinesis.get_shard_iterator( @shard.shard_id, Kcl::Checkpoints::Sentinel::AFTER_SEQUENCE_NUMBER, @shard.checkpoint ) end |