Class: Kcl::Workers::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/kcl/workers/consumer.rb

Overview

Shard : Consumer = 1 : 1

  • get records from stream

  • send to record processor

  • create record checkpoint

Instance Method Summary collapse

Constructor Details

#initialize(shard, record_processor, kinesis_proxy, checkpointer) ⇒ Consumer

Returns a new instance of Consumer.



7
8
9
10
11
12
# File 'lib/kcl/workers/consumer.rb', line 7

def initialize(shard, record_processor, kinesis_proxy, checkpointer)
  @shard = shard
  @record_processor = record_processor
  @kinesis = kinesis_proxy
  @checkpointer = checkpointer
end

Instance Method Details

#consume!Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/kcl/workers/consumer.rb', line 14

def consume!
  initialize_input = create_initialize_input
  @record_processor.after_initialize(initialize_input)

  record_checkpointer = Kcl::Workers::RecordCheckpointer.new(@shard, @checkpointer)
  shard_iterator = start_shard_iterator

  loop do
    result = @kinesis.get_records(shard_iterator)

    records_input = create_records_input(
      result[:records],
      result[:millis_behind_latest],
      record_checkpointer
    )
    @record_processor.process_records(records_input)

    shard_iterator = result[:next_shard_iterator]
    break if result[:records].empty? || shard_iterator.nil?
  end

  shutdown_reason = shard_iterator.nil? ?
    Kcl::Workers::ShutdownReason::TERMINATE :
    Kcl::Workers::ShutdownReason::REQUESTED
  shutdown_input = create_shutdown_input(shutdown_reason, record_checkpointer)
  @record_processor.shutdown(shutdown_input)
end

#create_initialize_inputObject



58
59
60
61
62
63
# File 'lib/kcl/workers/consumer.rb', line 58

def create_initialize_input
  Kcl::Types::InitializationInput.new(
    @shard.shard_id,
    Kcl::Types::ExtendedSequenceNumber.new(@shard.checkpoint)
  )
end

#create_records_input(records, millis_behind_latest, record_checkpointer) ⇒ Object



65
66
67
68
69
70
71
# File 'lib/kcl/workers/consumer.rb', line 65

def create_records_input(records, millis_behind_latest, record_checkpointer)
  Kcl::Types::RecordsInput.new(
    records,
    millis_behind_latest,
    record_checkpointer
  )
end

#create_shutdown_input(shutdown_reason, record_checkpointer) ⇒ Object



73
74
75
76
77
78
# File 'lib/kcl/workers/consumer.rb', line 73

def create_shutdown_input(shutdown_reason, record_checkpointer)
  Kcl::Types::ShutdownInput.new(
    shutdown_reason,
    record_checkpointer
  )
end

#start_shard_iteratorObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/kcl/workers/consumer.rb', line 42

def start_shard_iterator
  shard = @checkpointer.fetch_checkpoint(@shard)
  if shard.checkpoint.nil?
    return @kinesis.get_shard_iterator(
      @shard.shard_id,
      Kcl::Checkpoints::Sentinel::TRIM_HORIZON
    )
  end

  @kinesis.get_shard_iterator(
    @shard.shard_id,
    Kcl::Checkpoints::Sentinel::AFTER_SEQUENCE_NUMBER,
    @shard.checkpoint
  )
end