Class: KafkaSyrup::TopicConsumer
- Inherits:
-
Object
- Object
- KafkaSyrup::TopicConsumer
- Includes:
- Utils
- Defined in:
- lib/kafka_syrup/topic_consumer.rb
Instance Attribute Summary collapse
-
#commit_mode ⇒ Object
Returns the value of attribute commit_mode.
-
#consumer_id ⇒ Object
Returns the value of attribute consumer_id.
-
#control_queues ⇒ Object
Returns the value of attribute control_queues.
-
#group ⇒ Object
Returns the value of attribute group.
-
#lock ⇒ Object
Returns the value of attribute lock.
-
#max_bytes ⇒ Object
Returns the value of attribute max_bytes.
-
#messages ⇒ Object
Returns the value of attribute messages.
-
#offset_mode ⇒ Object
Returns the value of attribute offset_mode.
-
#offsets ⇒ Object
Returns the value of attribute offsets.
-
#partitions ⇒ Object
Returns the value of attribute partitions.
-
#threads ⇒ Object
Returns the value of attribute threads.
-
#topic ⇒ Object
Returns the value of attribute topic.
-
#uncommitted_partitions ⇒ Object
Returns the value of attribute uncommitted_partitions.
-
#watcher ⇒ Object
Returns the value of attribute watcher.
Instance Method Summary collapse
- #commit ⇒ Object
- #current_registration_info ⇒ Object
- #defaults ⇒ Object
- #fetch(limit = nil) ⇒ Object
- #get_offset(id) ⇒ Object
- #group_path ⇒ Object
-
#initialize(*args) ⇒ TopicConsumer
constructor
A new instance of TopicConsumer.
- #membership_path ⇒ Object
- #new_registration_info ⇒ Object
- #offsets_path ⇒ Object
- #ownership_path ⇒ Object
- #rebalance ⇒ Object
- #register ⇒ Object
- #registered? ⇒ Boolean
- #registration_path ⇒ Object
- #release_partitions ⇒ Object
- #set_offset(id, offset) ⇒ Object
- #subscribers ⇒ Object
- #unregister ⇒ Object
- #zk ⇒ Object
Methods included from Utils
Constructor Details
#initialize(*args) ⇒ TopicConsumer
Returns a new instance of TopicConsumer.
9 10 11 12 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 9 def initialize(*args) load_args(defaults) load_args(*args) end |
Instance Attribute Details
#commit_mode ⇒ Object
Returns the value of attribute commit_mode.
7 8 9 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 7 def commit_mode @commit_mode end |
#consumer_id ⇒ Object
Returns the value of attribute consumer_id.
7 8 9 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 7 def consumer_id @consumer_id end |
#control_queues ⇒ Object
Returns the value of attribute control_queues.
7 8 9 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 7 def control_queues @control_queues end |
#group ⇒ Object
Returns the value of attribute group.
7 8 9 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 7 def group @group end |
#lock ⇒ Object
Returns the value of attribute lock.
7 8 9 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 7 def lock @lock end |
#max_bytes ⇒ Object
Returns the value of attribute max_bytes.
7 8 9 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 7 def max_bytes @max_bytes end |
#messages ⇒ Object
Returns the value of attribute messages.
7 8 9 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 7 def @messages end |
#offset_mode ⇒ Object
Returns the value of attribute offset_mode.
7 8 9 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 7 def offset_mode @offset_mode end |
#offsets ⇒ Object
Returns the value of attribute offsets.
7 8 9 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 7 def offsets @offsets end |
#partitions ⇒ Object
Returns the value of attribute partitions.
7 8 9 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 7 def partitions @partitions end |
#threads ⇒ Object
Returns the value of attribute threads.
7 8 9 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 7 def threads @threads end |
#topic ⇒ Object
Returns the value of attribute topic.
7 8 9 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 7 def topic @topic end |
#uncommitted_partitions ⇒ Object
Returns the value of attribute uncommitted_partitions.
7 8 9 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 7 def uncommitted_partitions @uncommitted_partitions end |
#watcher ⇒ Object
Returns the value of attribute watcher.
7 8 9 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 7 def watcher @watcher end |
Instance Method Details
#commit ⇒ Object
168 169 170 171 172 173 174 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 168 def commit uncommitted_partitions.each do |id| set_offset(id, offsets[id]) end uncommitted_partitions.clear end |
#current_registration_info ⇒ Object
201 202 203 204 205 206 207 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 201 def current_registration_info info, _ = zk.get(registration_path) MultiJson.load(info) rescue ZK::Exceptions::NoNode nil end |
#defaults ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 14 def defaults { consumer_id: "#{Socket.gethostname}-#{hash}", partitions: [], offsets: {}, lock: Mutex.new, threads: [], messages: Queue.new, control_queues: [], offset_mode: :latest, commit_mode: :fetch, uncommitted_partitions: [] } end |
#fetch(limit = nil) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 114 def fetch(limit = nil) raise NotRegistered unless registered? commit if commit_mode == :fetch trigger_fetch if .empty? results = [] loop do results << .pop break if .empty? || limit && limit == results.count end # Ensure rebalancing isn't adjusting the offsets lock.synchronize do results.each do |msg| self.offsets[msg[:partition]] = msg[:offset] + 1 self.uncommitted_partitions |= [msg[:partition]] end end commit if commit_mode == :auto results end |
#get_offset(id) ⇒ Object
148 149 150 151 152 153 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 148 def get_offset(id) offset, _ = zk.get([offsets_path, id]*'/') offset.to_i rescue ZK::Exceptions::NoNode nil end |
#group_path ⇒ Object
209 210 211 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 209 def group_path ([KafkaSyrup.config.zookeeper_path, 'consumers', group]*'/').gsub(/\/\/+/, '/') end |
#membership_path ⇒ Object
213 214 215 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 213 def membership_path [group_path, 'ids']*'/' end |
#new_registration_info ⇒ Object
189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 189 def new_registration_info info = current_registration_info || { 'pattern' => 'static', 'version' => 1, 'subscription' => {} } info['subscription'][topic.to_s] = info['subscription'][topic.to_s].to_i + 1 info end |
#offsets_path ⇒ Object
221 222 223 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 221 def offsets_path [group_path, 'offsets', topic.to_s]*'/' end |
#ownership_path ⇒ Object
217 218 219 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 217 def ownership_path [group_path, 'owners', topic.to_s]*'/' end |
#rebalance ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 76 def rebalance raise NotRegistered unless registered? log.debug "Rebalance triggered on group #{group}" # Make sure fetch doesn't attempt to store offsets during a rebalance lock.synchronize do begin # Stop fetcher threads and clear locally cached messages threads.each(&:kill) [threads, , control_queues].each(&:clear) # Relinquish client claims to whatever partitions it's currently serving release_partitions # Determine which partitions to claim self.partitions = partition_ids_to_claim # Attempt to claim desired partitions in zookeeper partitions.each{ |id| zk.create([ownership_path, id]*'/', consumer_id.to_s, ephemeral: true) } # Retrieve offsets for successfully claimed partitions partitions.each{ |id| offsets[id] = get_offset(id) } # Start fetcher threads for partitions partitions.each(&method(:start_fetcher_thread)) sleep 0.01 rescue ZK::Exceptions::NodeExists # It's possible that another consumer has not yet released the partition this client is attempting to claim # No biggie - release any partitions this client has already claimed, backoff a bit, and retry release_partitions sleep 0.2 retry end end end |
#register ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 37 def register return if registered? # Make sure nodes exist in Zookeeper zk.mkdir_p(membership_path) zk.mkdir_p(ownership_path) zk.mkdir_p(offsets_path) begin zk.create(registration_path, MultiJson.dump(new_registration_info), ephemeral: true) rescue ZK::Exceptions::NodeExists no_node = false begin zk.set(registration_path, MultiJson.dump(new_registration_info)) rescue ::ZK::Exceptions::NoNode no_node = true end retry if no_node ensure self.watcher = zk.register(membership_path) do |event| rebalance zk.children(membership_path, watch: true) trigger_fetch end zk.children(membership_path, watch: true) rebalance registered? end end |
#registered? ⇒ Boolean
184 185 186 187 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 184 def registered? info = current_registration_info info.is_a?(Hash) && info['subscription'].is_a?(Hash) && info['subscription'][topic.to_s].to_i > 0 end |
#registration_path ⇒ Object
225 226 227 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 225 def registration_path [membership_path, consumer_id]*'/' end |
#release_partitions ⇒ Object
141 142 143 144 145 146 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 141 def release_partitions partitions.each{ |id| zk.rm_rf([ownership_path, id]*'/') } partitions.clear offsets.clear end |
#set_offset(id, offset) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 155 def set_offset(id, offset) log.debug "Committing offset #{offset} of partition #{id}" zk.set([offsets_path, id]*'/', offset.to_s) rescue ZK::Exceptions::NoNode node_exists = false begin zk.create([offsets_path, id]*'/', offset.to_s) rescue ZK::Exceptions::NodeExists node_exists = true end retry if node_exists end |
#subscribers ⇒ Object
176 177 178 179 180 181 182 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 176 def subscribers zk.children(membership_path).select do |member| info, _ = zk.get([membership_path, member]*'/') MultiJson.load(info)['subscription'][topic.to_s].to_i > 0 end end |
#unregister ⇒ Object
68 69 70 71 72 73 74 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 68 def unregister watcher.unregister threads.each(&:kill) [threads, , control_queues].each(&:clear) release_partitions zk.rm_rf([membership_path, consumer_id]*'/') end |
#zk ⇒ Object
29 30 31 32 33 34 35 |
# File 'lib/kafka_syrup/topic_consumer.rb', line 29 def zk if @zk.is_a?(ZK::Client::Threaded) && @zk.connected? @zk else @zk = ZK.new(KafkaSyrup.config.zookeeper_hosts, chroot: :check) end end |