Class: Kafka::Consumer::PartitionConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/consumer/partition_consumer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(consumer, partition, handler: nil, max_wait_ms: 100, initial_offset: :latest_offset, commit_interval: 5.0) ⇒ PartitionConsumer

Returns a new instance of PartitionConsumer.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/kafka/consumer/partition_consumer.rb', line 8

def initialize(consumer, partition, handler: nil, max_wait_ms: 100, initial_offset: :latest_offset, commit_interval: 5.0)
  @consumer, @partition, @handler = consumer, partition, handler
  @initial_offset, @max_wait_ms, @commit_interval = initial_offset, max_wait_ms, commit_interval

  @commit_mutex = Mutex.new

  @consumer_thread = Thread.new do
    Thread.current.abort_on_exception = true
    manage_partition_consumer
  end

  Thread.new do
    Thread.current.abort_on_exception = true
    background_committer
  end
end

Instance Attribute Details

#commit_intervalObject (readonly)

Returns the value of attribute commit_interval.



5
6
7
# File 'lib/kafka/consumer/partition_consumer.rb', line 5

def commit_interval
  @commit_interval
end

#consumerObject (readonly)

Returns the value of attribute consumer.



5
6
7
# File 'lib/kafka/consumer/partition_consumer.rb', line 5

def consumer
  @consumer
end

#handlerObject (readonly)

Returns the value of attribute handler.



5
6
7
# File 'lib/kafka/consumer/partition_consumer.rb', line 5

def handler
  @handler
end

#initial_offsetObject (readonly)

Returns the value of attribute initial_offset.



5
6
7
# File 'lib/kafka/consumer/partition_consumer.rb', line 5

def initial_offset
  @initial_offset
end

#last_committed_offsetObject (readonly)

Returns the value of attribute last_committed_offset.



5
6
7
# File 'lib/kafka/consumer/partition_consumer.rb', line 5

def last_committed_offset
  @last_committed_offset
end

#last_processed_offsetObject (readonly)

Returns the value of attribute last_processed_offset.



5
6
7
# File 'lib/kafka/consumer/partition_consumer.rb', line 5

def last_processed_offset
  @last_processed_offset
end

#max_wait_msObject (readonly)

Returns the value of attribute max_wait_ms.



5
6
7
# File 'lib/kafka/consumer/partition_consumer.rb', line 5

def max_wait_ms
  @max_wait_ms
end

#partitionObject (readonly)

Returns the value of attribute partition.



5
6
7
# File 'lib/kafka/consumer/partition_consumer.rb', line 5

def partition
  @partition
end

Instance Method Details

#background_committerObject



79
80
81
82
83
84
# File 'lib/kafka/consumer/partition_consumer.rb', line 79

def background_committer
  until interrupted?
    commit_last_offset
    sleep(commit_interval)
  end
end

#claim_partitionObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/kafka/consumer/partition_consumer.rb', line 48

def claim_partition
  consumer.logger.info "Claiming partition #{partition.topic.name}/#{partition.id}..."
  begin
    other_instance, change = consumer.group.watch_partition_claim(partition) { continue }
    if other_instance.nil?
      consumer.instance.claim_partition(partition)
    elsif other_instance == consumer.instance
      raise Kazoo::Error, "Already claimed this partition myself. That should not happen"
    else
      consumer.logger.warn "Partition #{partition.topic.name}/#{partition.id} is still claimed by instance #{other_instance.id}. Waiting for the claim to be released..."
      Thread.stop unless change.completed?

      return false if interrupted?
      raise Kazoo::PartitionAlreadyClaimed
    end
  rescue Kazoo::PartitionAlreadyClaimed
    retry unless interrupted?
  end

  true
end

#commit_last_offsetObject



70
71
72
73
74
75
76
77
# File 'lib/kafka/consumer/partition_consumer.rb', line 70

def commit_last_offset
  @commit_mutex.synchronize do
    if last_processed_offset && (last_committed_offset.nil? || last_committed_offset < last_processed_offset)
      consumer.group.commit_offset(partition, last_processed_offset)
      @last_committed_offset = last_processed_offset + 1
    end
  end
end

#continueObject



44
45
46
# File 'lib/kafka/consumer/partition_consumer.rb', line 44

def continue
  @consumer_thread.run if @consumer_thread.status == 'sleep'
end

#interruptObject



29
30
31
32
# File 'lib/kafka/consumer/partition_consumer.rb', line 29

def interrupt
  @consumer_thread[:interrupted] = true
  continue
end

#interrupted?Boolean

Returns:

  • (Boolean)


34
35
36
# File 'lib/kafka/consumer/partition_consumer.rb', line 34

def interrupted?
  @consumer_thread[:interrupted]
end

#manage_partition_consumerObject



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/kafka/consumer/partition_consumer.rb', line 86

def manage_partition_consumer
  # First, we will try to claim the partition in Zookeeper to ensure there's
  # only one consumer for it simultaneously.
  if claim_partition
    @last_committed_offset = consumer.group.retrieve_offset(partition)
    case start_offset = last_committed_offset || initial_offset
    when :earliest_offset, -2
      consumer.logger.info "Starting consumer for #{partition.topic.name}/#{partition.id} at the earliest available offset..."
    when :latest_offset, -1
      consumer.logger.info "Starting consumer for #{partition.topic.name}/#{partition.id} for new messages..."
    else
      consumer.logger.info "Starting consumer for #{partition.topic.name}/#{partition.id} at offset #{start_offset}..."
    end

    begin
      pc = Poseidon::PartitionConsumer.consumer_for_partition(
              consumer.group.name,
              consumer.cluster.brokers.values.map(&:addr),
              partition.topic.name,
              partition.id,
              start_offset
      )

      until interrupted?
        pc.fetch(max_wait_ms: max_wait_ms).each do |message|
          message = Message.new(partition.topic.name, partition.id, message)
          handler.call(message)
          @last_processed_offset = message.offset
        end
      end

    rescue Poseidon::Errors::OffsetOutOfRange
      pc.close

      consumer.logger.warn "Offset #{start_offset} is no longer available for #{partition.topic.name}/#{partition.id}!"
      case initial_offset
      when :earliest_offset, -2
        consumer.logger.warn "Instead, start consuming #{partition.topic.name}/#{partition.id} at the earliest available offset."
      when :latest_offset, -1
        consumer.logger.warn "Instead, start onsuming #{partition.topic.name}/#{partition.id} for new messages only."
      end

      start_offset = initial_offset
      retry

    ensure
      consumer.logger.debug "Stopping consumer for #{partition.topic.name}/#{partition.id}..."
      pc.close
    end


    commit_last_offset
    consumer.logger.info "Committed offset #{last_committed_offset - 1} for #{partition.topic.name}/#{partition.id}..." if last_committed_offset

    consumer.instance.release_partition(partition)
    consumer.logger.debug "Released claim for partition #{partition.topic.name}/#{partition.id}."
  end
end

#stopObject



38
39
40
41
42
# File 'lib/kafka/consumer/partition_consumer.rb', line 38

def stop
  interrupt
  wait
  consumer.logger.info "Consumer for #{partition.topic.name}/#{partition.id} stopped."
end

#waitObject



25
26
27
# File 'lib/kafka/consumer/partition_consumer.rb', line 25

def wait
  @consumer_thread.join if @consumer_thread.alive?
end