Class: KafkaSyrup::TopicConsumer

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/kafka_syrup/topic_consumer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utils

#load_args, #log

Constructor Details

#initialize(*args) ⇒ TopicConsumer

Returns a new instance of TopicConsumer.



9
10
11
12
# File 'lib/kafka_syrup/topic_consumer.rb', line 9

def initialize(*args)
  load_args(defaults)
  load_args(*args)
end

Instance Attribute Details

#commit_modeObject

Returns the value of attribute commit_mode.



7
8
9
# File 'lib/kafka_syrup/topic_consumer.rb', line 7

def commit_mode
  @commit_mode
end

#consumer_idObject

Returns the value of attribute consumer_id.



7
8
9
# File 'lib/kafka_syrup/topic_consumer.rb', line 7

def consumer_id
  @consumer_id
end

#control_queuesObject

Returns the value of attribute control_queues.



7
8
9
# File 'lib/kafka_syrup/topic_consumer.rb', line 7

def control_queues
  @control_queues
end

#groupObject

Returns the value of attribute group.



7
8
9
# File 'lib/kafka_syrup/topic_consumer.rb', line 7

def group
  @group
end

#lockObject

Returns the value of attribute lock.



7
8
9
# File 'lib/kafka_syrup/topic_consumer.rb', line 7

def lock
  @lock
end

#max_bytesObject

Returns the value of attribute max_bytes.



7
8
9
# File 'lib/kafka_syrup/topic_consumer.rb', line 7

def max_bytes
  @max_bytes
end

#messagesObject

Returns the value of attribute messages.



7
8
9
# File 'lib/kafka_syrup/topic_consumer.rb', line 7

def messages
  @messages
end

#offset_modeObject

Returns the value of attribute offset_mode.



7
8
9
# File 'lib/kafka_syrup/topic_consumer.rb', line 7

def offset_mode
  @offset_mode
end

#offsetsObject

Returns the value of attribute offsets.



7
8
9
# File 'lib/kafka_syrup/topic_consumer.rb', line 7

def offsets
  @offsets
end

#partitionsObject

Returns the value of attribute partitions.



7
8
9
# File 'lib/kafka_syrup/topic_consumer.rb', line 7

def partitions
  @partitions
end

#threadsObject

Returns the value of attribute threads.



7
8
9
# File 'lib/kafka_syrup/topic_consumer.rb', line 7

def threads
  @threads
end

#topicObject

Returns the value of attribute topic.



7
8
9
# File 'lib/kafka_syrup/topic_consumer.rb', line 7

def topic
  @topic
end

#uncommitted_partitionsObject

Returns the value of attribute uncommitted_partitions.



7
8
9
# File 'lib/kafka_syrup/topic_consumer.rb', line 7

def uncommitted_partitions
  @uncommitted_partitions
end

#watcherObject

Returns the value of attribute watcher.



7
8
9
# File 'lib/kafka_syrup/topic_consumer.rb', line 7

def watcher
  @watcher
end

Instance Method Details

#commitObject



168
169
170
171
172
173
174
# File 'lib/kafka_syrup/topic_consumer.rb', line 168

def commit
  uncommitted_partitions.each do |id|
    set_offset(id, offsets[id])
  end

  uncommitted_partitions.clear
end

#current_registration_infoObject



201
202
203
204
205
206
207
# File 'lib/kafka_syrup/topic_consumer.rb', line 201

def current_registration_info
  info, _ = zk.get(registration_path)

  MultiJson.load(info)
rescue ZK::Exceptions::NoNode
  nil
end

#defaultsObject



14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/kafka_syrup/topic_consumer.rb', line 14

def defaults
  {
    consumer_id: "#{Socket.gethostname}-#{hash}",
    partitions: [],
    offsets: {},
    lock: Mutex.new,
    threads: [],
    messages: Queue.new,
    control_queues: [],
    offset_mode: :latest,
    commit_mode: :fetch,
    uncommitted_partitions: []
  }
end

#fetch(limit = nil) ⇒ Object

Raises:



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/kafka_syrup/topic_consumer.rb', line 114

def fetch(limit = nil)
  raise NotRegistered unless registered?

  commit if commit_mode == :fetch

  trigger_fetch if messages.empty?

  results = []

  loop do
    results << messages.pop
    break if messages.empty? || limit && limit == results.count
  end

  # Ensure rebalancing isn't adjusting the offsets
  lock.synchronize do
    results.each do |msg|
      self.offsets[msg[:partition]] = msg[:offset] + 1
      self.uncommitted_partitions |= [msg[:partition]]
    end
  end

  commit if commit_mode == :auto

  results
end

#get_offset(id) ⇒ Object



148
149
150
151
152
153
# File 'lib/kafka_syrup/topic_consumer.rb', line 148

def get_offset(id)
  offset, _ = zk.get([offsets_path, id]*'/')
  offset.to_i
rescue ZK::Exceptions::NoNode
  nil
end

#group_pathObject



209
210
211
# File 'lib/kafka_syrup/topic_consumer.rb', line 209

def group_path
  ([KafkaSyrup.config.zookeeper_path, 'consumers', group]*'/').gsub(/\/\/+/, '/')
end

#membership_pathObject



213
214
215
# File 'lib/kafka_syrup/topic_consumer.rb', line 213

def membership_path
  [group_path, 'ids']*'/'
end

#new_registration_infoObject



189
190
191
192
193
194
195
196
197
198
199
# File 'lib/kafka_syrup/topic_consumer.rb', line 189

def new_registration_info
  info = current_registration_info || {
    'pattern' => 'static',
    'version' => 1,
    'subscription' => {}
  }

  info['subscription'][topic.to_s] = info['subscription'][topic.to_s].to_i + 1

  info
end

#offsets_pathObject



221
222
223
# File 'lib/kafka_syrup/topic_consumer.rb', line 221

def offsets_path
  [group_path, 'offsets', topic.to_s]*'/'
end

#ownership_pathObject



217
218
219
# File 'lib/kafka_syrup/topic_consumer.rb', line 217

def ownership_path
  [group_path, 'owners', topic.to_s]*'/'
end

#rebalanceObject

Raises:



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/kafka_syrup/topic_consumer.rb', line 76

def rebalance
  raise NotRegistered unless registered?

  log.debug "Rebalance triggered on group #{group}"

  # Make sure fetch doesn't attempt to store offsets during a rebalance
  lock.synchronize do
    begin
      # Stop fetcher threads and clear locally cached messages
      threads.each(&:kill)
      [threads, messages, control_queues].each(&:clear)

      # Relinquish client claims to whatever partitions it's currently serving
      release_partitions

      # Determine which partitions to claim
      self.partitions = partition_ids_to_claim

      # Attempt to claim desired partitions in zookeeper
      partitions.each{ |id| zk.create([ownership_path, id]*'/', consumer_id.to_s, ephemeral: true) }

      # Retrieve offsets for successfully claimed partitions
      partitions.each{ |id| offsets[id] = get_offset(id) }

      # Start fetcher threads for partitions
      partitions.each(&method(:start_fetcher_thread))
      sleep 0.01

    rescue ZK::Exceptions::NodeExists
      # It's possible that another consumer has not yet released the partition this client is attempting to claim
      # No biggie - release any partitions this client has already claimed, backoff a bit, and retry
      release_partitions
      sleep 0.2
      retry
    end
  end
end

#registerObject



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/kafka_syrup/topic_consumer.rb', line 37

def register
  return if registered?

  # Make sure nodes exist in Zookeeper
  zk.mkdir_p(membership_path)
  zk.mkdir_p(ownership_path)
  zk.mkdir_p(offsets_path)

  begin
    zk.create(registration_path, MultiJson.dump(new_registration_info), ephemeral: true)

  rescue ZK::Exceptions::NodeExists
    no_node = false
    begin
      zk.set(registration_path, MultiJson.dump(new_registration_info))
    rescue ::ZK::Exceptions::NoNode
      no_node = true
    end
    retry if no_node
  ensure
    self.watcher = zk.register(membership_path) do |event|
      rebalance
      zk.children(membership_path, watch: true)
      trigger_fetch
    end
    zk.children(membership_path, watch: true)
    rebalance
    registered?
  end
end

#registered?Boolean

Returns:

  • (Boolean)


184
185
186
187
# File 'lib/kafka_syrup/topic_consumer.rb', line 184

def registered?
  info = current_registration_info
  info.is_a?(Hash) && info['subscription'].is_a?(Hash) && info['subscription'][topic.to_s].to_i > 0
end

#registration_pathObject



225
226
227
# File 'lib/kafka_syrup/topic_consumer.rb', line 225

def registration_path
  [membership_path, consumer_id]*'/'
end

#release_partitionsObject



141
142
143
144
145
146
# File 'lib/kafka_syrup/topic_consumer.rb', line 141

def release_partitions
  partitions.each{ |id| zk.rm_rf([ownership_path, id]*'/') }

  partitions.clear
  offsets.clear
end

#set_offset(id, offset) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/kafka_syrup/topic_consumer.rb', line 155

def set_offset(id, offset)
  log.debug "Committing offset #{offset} of partition #{id}"
  zk.set([offsets_path, id]*'/', offset.to_s)
rescue ZK::Exceptions::NoNode
  node_exists = false
  begin
    zk.create([offsets_path, id]*'/', offset.to_s)
  rescue ZK::Exceptions::NodeExists
    node_exists = true
  end
  retry if node_exists
end

#subscribersObject



176
177
178
179
180
181
182
# File 'lib/kafka_syrup/topic_consumer.rb', line 176

def subscribers
  zk.children(membership_path).select do |member|
    info, _ = zk.get([membership_path, member]*'/')

    MultiJson.load(info)['subscription'][topic.to_s].to_i > 0
  end
end

#unregisterObject



68
69
70
71
72
73
74
# File 'lib/kafka_syrup/topic_consumer.rb', line 68

def unregister
  watcher.unregister
  threads.each(&:kill)
  [threads, messages, control_queues].each(&:clear)
  release_partitions
  zk.rm_rf([membership_path, consumer_id]*'/')
end

#zkObject



29
30
31
32
33
34
35
# File 'lib/kafka_syrup/topic_consumer.rb', line 29

def zk
  if @zk.is_a?(ZK::Client::Threaded) && @zk.connected?
    @zk
  else
    @zk = ZK.new(KafkaSyrup.config.zookeeper_hosts, chroot: :check)
  end
end