Class: Aerospike::PartitionTracker
- Inherits:
-
Object
- Object
- Aerospike::PartitionTracker
- Defined in:
- lib/aerospike/query/partition_tracker.rb
Instance Attribute Summary collapse
-
#deadline ⇒ Object
readonly
Returns the value of attribute deadline.
-
#iteration ⇒ Object
readonly
Returns the value of attribute iteration.
-
#max_records ⇒ Object
readonly
Returns the value of attribute max_records.
-
#node_capacity ⇒ Object
readonly
Returns the value of attribute node_capacity.
-
#node_filter ⇒ Object
readonly
Returns the value of attribute node_filter.
-
#node_partitions_list ⇒ Object
readonly
Returns the value of attribute node_partitions_list.
-
#partition_begin ⇒ Object
readonly
Returns the value of attribute partition_begin.
-
#partition_filter ⇒ Object
readonly
Returns the value of attribute partition_filter.
-
#partitions ⇒ Object
readonly
Returns the value of attribute partitions.
-
#partitions_capacity ⇒ Object
readonly
Returns the value of attribute partitions_capacity.
-
#sleep_between_retries ⇒ Object
Returns the value of attribute sleep_between_retries.
-
#socket_timeout ⇒ Object
readonly
Returns the value of attribute socket_timeout.
-
#total_timeout ⇒ Object
readonly
Returns the value of attribute total_timeout.
Instance Method Summary collapse
- #assign_partitions_to_nodes(cluster, namespace) ⇒ Object
- #complete?(cluster, policy) ⇒ Boolean
- #find_node(list, node) ⇒ Object
- #init_partitions(policy, partition_count, digest) ⇒ Object
- #init_timeout(policy) ⇒ Object
-
#initialize(policy, nodes, partition_filter = nil) ⇒ PartitionTracker
constructor
A new instance of PartitionTracker.
- #mark_retry(node_partitions) ⇒ Object
- #partition_unavailable(node_partitions, partition_id) ⇒ Object
- #set_digest(node_partitions, key) ⇒ Object
- #set_last(node_partitions, key, bval) ⇒ Object
- #should_retry(node_partitions, err) ⇒ Object
- #to_s ⇒ Object
Constructor Details
#initialize(policy, nodes, partition_filter = nil) ⇒ PartitionTracker
Returns a new instance of PartitionTracker.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/aerospike/query/partition_tracker.rb', line 24 def initialize(policy, nodes, partition_filter = nil) if partition_filter.nil? return init_for_node(policy, nodes[0]) if nodes.length == 1 return init_for_nodes(policy, nodes) end # Validate here instead of initial PartitionFilter constructor because total number of # cluster partitions may change on the server and PartitionFilter will never have access # to Cluster instance. Use fixed number of partitions for now. unless partition_filter.partition_begin.between?(0, Node::PARTITIONS - 1) raise Aerospike::Exceptions::Aerospike.new( Aerospike::ResultCode::PARAMETER_ERROR, "Invalid partition begin #{partition_filter.partition_begin}. Valid range: 0-#{Aerospike::Node::PARTITIONS - 1}" ) end if partition_filter.count <= 0 raise Aerospike::Exceptions::Aerospike.new( Aerospike::ResultCode::PARAMETER_ERROR, "Invalid partition count #{partition_filter.count}" ) end if partition_filter.partition_begin + partition_filter.count > Node::PARTITIONS raise Aerospike::Exceptions::Aerospike.new( Aerospike::ResultCode::PARAMETER_ERROR, "Invalid partition range (#{partition_filter.partition_begin}, #{partition_filter.partition_begin + partition_filter.count}" ) end @partition_begin = partition_filter.partition_begin @node_capacity = nodes.length @node_filter = nil @partitions_capacity = partition_filter.count @max_records = policy.max_records @iteration = 1 if partition_filter.partitions.nil? then partition_filter.partitions = init_partitions(policy, partition_filter.count, partition_filter.digest) elsif policy.max_records <= 0 # Retry all partitions when max_records not specified. partition_filter.partitions.each do |ps| ps.retry = true end end @partitions = partition_filter.partitions @partition_filter = partition_filter init_timeout(policy) end |
Instance Attribute Details
#deadline ⇒ Object (readonly)
Returns the value of attribute deadline.
20 21 22 |
# File 'lib/aerospike/query/partition_tracker.rb', line 20 def deadline @deadline end |
#iteration ⇒ Object (readonly)
Returns the value of attribute iteration.
20 21 22 |
# File 'lib/aerospike/query/partition_tracker.rb', line 20 def iteration @iteration end |
#max_records ⇒ Object (readonly)
Returns the value of attribute max_records.
20 21 22 |
# File 'lib/aerospike/query/partition_tracker.rb', line 20 def max_records @max_records end |
#node_capacity ⇒ Object (readonly)
Returns the value of attribute node_capacity.
20 21 22 |
# File 'lib/aerospike/query/partition_tracker.rb', line 20 def node_capacity @node_capacity end |
#node_filter ⇒ Object (readonly)
Returns the value of attribute node_filter.
20 21 22 |
# File 'lib/aerospike/query/partition_tracker.rb', line 20 def node_filter @node_filter end |
#node_partitions_list ⇒ Object (readonly)
Returns the value of attribute node_partitions_list.
20 21 22 |
# File 'lib/aerospike/query/partition_tracker.rb', line 20 def node_partitions_list @node_partitions_list end |
#partition_begin ⇒ Object (readonly)
Returns the value of attribute partition_begin.
20 21 22 |
# File 'lib/aerospike/query/partition_tracker.rb', line 20 def partition_begin @partition_begin end |
#partition_filter ⇒ Object (readonly)
Returns the value of attribute partition_filter.
20 21 22 |
# File 'lib/aerospike/query/partition_tracker.rb', line 20 def partition_filter @partition_filter end |
#partitions ⇒ Object (readonly)
Returns the value of attribute partitions.
20 21 22 |
# File 'lib/aerospike/query/partition_tracker.rb', line 20 def partitions @partitions end |
#partitions_capacity ⇒ Object (readonly)
Returns the value of attribute partitions_capacity.
20 21 22 |
# File 'lib/aerospike/query/partition_tracker.rb', line 20 def partitions_capacity @partitions_capacity end |
#sleep_between_retries ⇒ Object
Returns the value of attribute sleep_between_retries.
20 21 22 |
# File 'lib/aerospike/query/partition_tracker.rb', line 20 def sleep_between_retries @sleep_between_retries end |
#socket_timeout ⇒ Object (readonly)
Returns the value of attribute socket_timeout.
20 21 22 |
# File 'lib/aerospike/query/partition_tracker.rb', line 20 def socket_timeout @socket_timeout end |
#total_timeout ⇒ Object (readonly)
Returns the value of attribute total_timeout.
20 21 22 |
# File 'lib/aerospike/query/partition_tracker.rb', line 20 def total_timeout @total_timeout end |
Instance Method Details
#assign_partitions_to_nodes(cluster, namespace) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/aerospike/query/partition_tracker.rb', line 75 def assign_partitions_to_nodes(cluster, namespace) list = [] pmap = cluster.partitions replica_array = pmap[namespace] raise Aerospike::Exceptions::InvalidNamespace.new("namespace not found in the partition map") if !replica_array master = (replica_array.get)[0] master = master.get @partitions.each do |part| if part&.retry node = master[part.id] unless node raise Exceptions::Aerospike.new(Aerospike::ResultCode::INVALID_NAMESPACE, "Invalid Partition Id #{part.id} for namespace `#{namespace}` in Partition Scan") end part.retry = false # Use node name to check for single node equality because # partition map may be in transitional state between # the old and new node with the same name. next if @node_filter && @node_filter.name != node.name np = find_node(list, node) unless np # If the partition map is in a transitional state, multiple # node_partitions instances (each with different partitions) # may be created for a single node. np = NodePartitions.new(node) list << np end np.add_partition(part) end end if @max_records.positive? # Distribute max_records across nodes. node_size = list.length if @max_records < node_size # Only include nodes that have at least 1 record requested. node_size = @max_records list = list[0...node_size] end max = 0 max = @max_records / node_size if node_size.positive? rem = @max_records - (max * node_size) list[0...node_size].each_with_index do |np, i| np.record_max = (i < rem ? max + 1 : max) end end @node_partitions_list = list list end |
#complete?(cluster, policy) ⇒ Boolean
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/aerospike/query/partition_tracker.rb', line 206 def complete?(cluster, policy) record_count = 0 parts_unavailable = 0 @node_partitions_list.each do |np| record_count += np.record_count parts_unavailable += np.parts_unavailable end if parts_unavailable == 0 if @max_records <= 0 @partition_filter&.done = true else if cluster.supports_partition_query.get() done = true @node_partitions_list.each do |np| if np.record_count >= np.record_max mark_retry(np) done = false end end @partition_filter&.done = done else # Server version >= 6.0 will return all records for each node up to # that node's max. If node's record count reached max, there stilthen # may be records available for that node. @node_partitions_list.each do |np| mark_retry(np) if np.record_count > 0 end # Servers version < 6.0 can return less records than max and still # have more records for each node, so the node is only done if nthen # records were retrieved for that node. @partition_filter&.done = (record_count == 0) end end return true end return true if @max_records&.positive? && record_count >= @max_records # Check if limits have been reached if policy.max_retries.positive? && @iteration > policy.max_retries raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::MAX_RETRIES_EXCEEDED, "Max retries exceeded: #{policy.max_retries}") end if policy.total_timeout > 0 # Check for total timeout. remaining = @deadline - Time.now - @sleep_between_retries raise Aerospike::Exceptions::Timeout.new(policy.totle_timeout, @iteration) if remaining <= 0 if remaining < @total_timeout @total_timeout = remaining if @socket_timeout > @total_timeout @socket_timeout = @total_timeout end end end # Prepare for next iteration. if @max_records > 0 @max_records -= record_count end @iteration+=1 false end |
#find_node(list, node) ⇒ Object
176 177 178 179 180 181 182 |
# File 'lib/aerospike/query/partition_tracker.rb', line 176 def find_node(list, node) list.each do |node_partition| # Use pointer equality for performance. return node_partition if node_partition.node == node end nil end |
#init_partitions(policy, partition_count, digest) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/aerospike/query/partition_tracker.rb', line 149 def init_partitions(policy, partition_count, digest) parts_all = Array.new(partition_count) (0...partition_count).each do |i| parts_all[i] = Aerospike::PartitionStatus.new(@partition_begin + i) end parts_all[0].digest = digest if digest @sleep_between_retries = policy.sleep_between_retries @socket_timeout = policy.socket_timeout @total_timeout = policy.timeout if @total_timeout.positive? @deadline = Time.now + @total_timeout if @socket_timeout == 0 || @socket_timeout > @total_timeout @socket_timeout = @total_timeout end end parts_all end |
#init_timeout(policy) ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/aerospike/query/partition_tracker.rb', line 137 def init_timeout(policy) @sleep_between_retries = policy.sleep_between_retries @socket_timeout = policy.socket_timeout @total_timeout = policy.timeout if @total_timeout.positive? @deadline = Time.now + @total_timeout if !@socket_timeout || @socket_timeout > @total_timeout @socket_timeout = @total_timeout end end end |
#mark_retry(node_partitions) ⇒ Object
294 295 296 297 298 299 300 301 302 |
# File 'lib/aerospike/query/partition_tracker.rb', line 294 def mark_retry(node_partitions) node_partitions.parts_full.each do |ps| ps.retry = true end node_partitions.parts_partial.each do |ps| ps.retry = true end end |
#partition_unavailable(node_partitions, partition_id) ⇒ Object
184 185 186 187 |
# File 'lib/aerospike/query/partition_tracker.rb', line 184 def partition_unavailable(node_partitions, partition_id) @partitions[partition_id-@partition_begin].retry = true node_partitions.parts_unavailable+=1 end |
#set_digest(node_partitions, key) ⇒ Object
189 190 191 192 193 |
# File 'lib/aerospike/query/partition_tracker.rb', line 189 def set_digest(node_partitions, key) partition_id = key.partition_id @partitions[partition_id-@partition_begin].digest = key.digest node_partitions.record_count+=1 end |
#set_last(node_partitions, key, bval) ⇒ Object
195 196 197 198 199 200 201 202 203 204 |
# File 'lib/aerospike/query/partition_tracker.rb', line 195 def set_last(node_partitions, key, bval) partition_id = key.partition_id() if partition_id-@partition_begin < 0 raise "key.partition_id: #{@partition_id}, partition_begin: #{@partition_begin}" end ps = @partitions[partition_id-@partition_begin] ps.digest = key.digest ps.bval = bval node_partitions.record_count+=1 end |
#should_retry(node_partitions, err) ⇒ Object
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/aerospike/query/partition_tracker.rb', line 277 def should_retry(node_partitions, err) case err when Aerospike::Exceptions::Aerospike case err.result_code when Aerospike::ResultCode::TIMEOUT, Aerospike::ResultCode::NETWORK_ERROR, Aerospike::ResultCode::SERVER_NOT_AVAILABLE, Aerospike::ResultCode::INDEX_NOTFOUND mark_retry(node_partitions) node_partitions.parts_unavailable = node_partitions.parts_full.length + node_partitions.parts_partial.length true end else false end end |
#to_s ⇒ Object
304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/aerospike/query/partition_tracker.rb', line 304 def to_s sb = StringIO.new @partitions.each_with_index do |ps, i| sb << ps.to_s sb << if (i+1)%16 == 0 "\n" else "\t" end end sb.string end |