Class: Aerospike::Command
- Inherits:
-
Object
- Object
- Aerospike::Command
- Defined in:
- lib/aerospike/command/command.rb
Overview
:nodoc:
Direct Known Subclasses
Instance Method Summary collapse
- #execute ⇒ Object
-
#initialize(node = nil) ⇒ Command
constructor
A new instance of Command.
-
#set_delete(policy, key) ⇒ Object
Writes the command for delete operations.
-
#set_exists(policy, key) ⇒ Object
Writes the command for exist operations.
-
#set_operate(policy, key, args) ⇒ Object
Implements different command operations.
- #set_query(cluster, policy, statement, background, node_partitions) ⇒ Object
-
#set_read(policy, key, bin_names) ⇒ Object
Writes the command for get operations (specified bins).
-
#set_read_for_key_only(policy, key) ⇒ Object
Writes the command for get operations (all bins).
-
#set_read_header(policy, key) ⇒ Object
Writes the command for getting metadata operations.
- #set_scan(cluster, policy, namespace, set_name, bin_names, node_partitions) ⇒ Object
-
#set_touch(policy, key) ⇒ Object
Writes the command for touch operations.
- #set_udf(policy, key, package_name, function_name, args) ⇒ Object
-
#set_write(policy, operation, key, bins) ⇒ Object
Writes the command for write operations.
-
#write_bins ⇒ Object
List of all bins that this command will write to - sub-classes should override this as appropriate.
Constructor Details
#initialize(node = nil) ⇒ Command
Returns a new instance of Command.
97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/aerospike/command/command.rb', line 97 def initialize(node = nil) @data_offset = 0 @data_buffer = nil @node = node @compress = false # will add before use @sequence = Atomic.new(-1) self end |
Instance Method Details
#execute ⇒ Object
686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 |
# File 'lib/aerospike/command/command.rb', line 686 def execute iterations = 0 failed_nodes = [] # set timeout outside the loop limit = Time.now + @policy.timeout # Execute command until successful, timed out or maximum iterations have been reached. while true # too many retries iterations += 1 break if (@policy.max_retries > 0) && (iterations > @policy.max_retries + 1) # Sleep before trying again, after the first iteration sleep(@policy.sleep_between_retries) if iterations > 1 && @policy.sleep_between_retries > 0 # check for command timeout break if @policy.timeout > 0 && Time.now > limit begin @node = get_node @conn = @node.get_connection(@policy.timeout) rescue => e failed_nodes << @node if @node if @node # Socket connection error has occurred. Decrease health and retry. @node.decrease_health Aerospike.logger.error("Node #{@node}: #{e}") else Aerospike.logger.error("No node available for transaction: #{e}") end next end # Draw a buffer from buffer pool, and make sure it will be put back begin @data_buffer = Buffer.get # Set command buffer. begin write_buffer rescue => e failed_nodes << @node if @node Aerospike.logger.error(e) # All runtime exceptions are considered fatal. Do not retry. # Close socket to flush out possible garbage. Do not put back in pool. @conn.close if @conn raise e end # Reset timeout in send buffer (destined for server) and socket. @data_buffer.write_int32((@policy.timeout * 1000).to_i, 22) # Send command. begin @conn.write(@data_buffer, @data_offset) rescue => e failed_nodes << @node if @node # IO errors are considered temporary anomalies. Retry. # Close socket to flush out possible garbage. Do not put back in pool. @conn.close if @conn Aerospike.logger.error("Node #{@node}: #{e}") # IO error means connection to server @node is unhealthy. # Reflect cmd status. @node.decrease_health next end # Parse results. begin parse_result rescue => e failed_nodes << @node if @node case e # do not log the following exceptions when Aerospike::Exceptions::ScanTerminated when Aerospike::Exceptions::QueryTerminated else Aerospike.logger.error(e) end # close the connection # cancelling/closing the batch/multi commands will return an error, which will # close the connection to throw away its data and signal the server about the # situation. We will not put back the connection in the buffer. @conn.close if @conn raise e end # Reflect healthy status. @node.restore_health # Put connection back in pool. @node.put_connection(@conn) # command has completed successfully. Exit method. return ensure Buffer.put(@data_buffer) end end # while # execution timeout raise Aerospike::Exceptions::Timeout.new(limit, iterations, failed_nodes) end |
#set_delete(policy, key) ⇒ Object
Writes the command for delete operations
144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/aerospike/command/command.rb', line 144 def set_delete(policy, key) begin_cmd field_count = estimate_key_size(key) exp_size = estimate_expression_size(@policy.filter_exp) field_count += 1 if exp_size > 0 size_buffer write_header_write(policy, INFO2_WRITE | INFO2_DELETE, field_count, 0) write_key(key) write_filter_exp(@policy.filter_exp, exp_size) end_cmd end |
#set_exists(policy, key) ⇒ Object
Writes the command for exist operations
176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/aerospike/command/command.rb', line 176 def set_exists(policy, key) begin_cmd field_count = estimate_key_size(key) exp_size = estimate_expression_size(@policy.filter_exp) field_count += 1 if exp_size > 0 size_buffer write_header_read_header(policy, INFO1_READ | INFO1_NOBINDATA, field_count, 0) write_key(key) write_filter_exp(@policy.filter_exp, exp_size) end_cmd end |
#set_operate(policy, key, args) ⇒ Object
Implements different command operations
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/aerospike/command/command.rb', line 262 def set_operate(policy, key, args) begin_cmd field_count = estimate_key_size(key, policy) exp_size = estimate_expression_size(policy.filter_exp) field_count += 1 if exp_size > 0 @data_offset += args.size size_buffer write_header_read_write(policy, args, field_count) write_key(key, policy) write_filter_exp(policy.filter_exp, exp_size) args.operations.each do |operation| write_operation_for_operation(operation) end end_cmd mark_compressed(policy) end |
#set_query(cluster, policy, statement, background, node_partitions) ⇒ Object
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 |
# File 'lib/aerospike/command/command.rb', line 445 def set_query(cluster, policy, statement, background, node_partitions) function_arg_buffer = nil field_count = 0 filter_size = 0 is_new = cluster.supports_feature?(Aerospike::Features::PARTITION_QUERY) begin_cmd if statement.namespace @data_offset += statement.namespace.bytesize + FIELD_HEADER_SIZE field_count += 1 end if statement.set_name @data_offset += statement.set_name.bytesize + FIELD_HEADER_SIZE field_count += 1 end # Estimate recordsPerSecond field size. This field is used in new servers and not used # (but harmless to add) in old servers. if statement.records_per_second > 0 @data_offset += 4 + FIELD_HEADER_SIZE field_count += 1 end # Estimate socket timeout field size. This field is used in new servers and not used # (but harmless to add) in old servers. @data_offset += 4 + FIELD_HEADER_SIZE field_count += 1 # Estimate task_id field. @data_offset += 8 + FIELD_HEADER_SIZE field_count += 1 filter = statement.filters[0] bin_names = statement.bin_names packed_ctx = nil if filter col_type = filter.collection_type # Estimate INDEX_TYPE field. if col_type > 0 @data_offset += FIELD_HEADER_SIZE + 1 field_count += 1 end # Estimate INDEX_RANGE field. @data_offset += FIELD_HEADER_SIZE filter_size += 1 # num filters filter_size += filter.estimate_size @data_offset += filter_size field_count += 1 packed_ctx = filter.packed_ctx if packed_ctx @data_offset += FIELD_HEADER_SIZE + packed_ctx.length field_count += 1 end end statement.set_task_id unless policy.filter_exp.nil? exp_size = estimate_expression_size(policy.filter_exp) field_count += 1 if exp_size > 0 end # Estimate aggregation/background function size. if statement.function_name @data_offset += FIELD_HEADER_SIZE + 1 # udf type @data_offset += statement.package_name.bytesize + FIELD_HEADER_SIZE @data_offset += statement.function_name.bytesize + FIELD_HEADER_SIZE function_arg_buffer = "" if statement.function_args && !statement.function_args.empty? function_arg_buffer = Value.of(statement.function_args).to_bytes end @data_offset += FIELD_HEADER_SIZE + function_arg_buffer.bytesize field_count += 4 end max_records = 0 parts_full_size = 0 parts_partial_digest_size = 0 parts_partial_bval_size = 0 unless node_partitions.nil? parts_full_size = node_partitions.parts_full.length * 2 parts_partial_digest_size = node_partitions.parts_partial.length * 20 unless filter.nil? parts_partial_bval_size = node_partitions.parts_partial.length * 8 end max_records = node_partitions.record_max end if parts_full_size > 0 @data_offset += parts_full_size + FIELD_HEADER_SIZE field_count += 1 end if parts_partial_digest_size > 0 @data_offset += parts_partial_digest_size + FIELD_HEADER_SIZE field_count += 1 end if parts_partial_bval_size > 0 @data_offset += parts_partial_bval_size + FIELD_HEADER_SIZE field_count += 1 end # Estimate max records field size. This field is used in new servers and not used # (but harmless to add) in old servers. if max_records > 0 @data_offset += 8 + FIELD_HEADER_SIZE field_count += 1 end unless statement.operations.nil? operations = statement.operations end operation_count = 0 if operations unless background raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::PARAMETER_ERROR, nil, [@node]) end operations.each do |operation| estimate_operation_size_for_operation(operation) end operation_count = operations.size elsif !bin_names.empty? bin_names.each do |bin_name| estimate_operation_size_for_bin_name(bin_name) end operation_count = bin_names.length # Estimate size for selected bin names (query bin names already handled for old servers). end size_buffer if background write_header_write(policy, INFO2_WRITE, field_count, operation_count) else read_attr = INFO1_READ write_attr = 0 read_attr |= INFO1_NOBINDATA unless policy.include_bin_data if policy.short_query || policy.expected_duration == QueryDuration::SHORT read_attr |= INFO1_SHORT_QUERY elsif policy.expected_duration == QueryDuration::LONG_RELAX_AP write_attr |= INFO2_RELAX_AP_LONG_QUERY end info_attr = INFO3_PARTITION_DONE if is_new write_header_read(policy, read_attr, write_attr, info_attr, field_count, operation_count) end write_field_string(statement.namespace, FieldType::NAMESPACE) if statement.namespace write_field_string(statement.set_name, FieldType::TABLE) if statement.set_name # Write records per second. write_field_int(statement.records_per_second, FieldType::RECORDS_PER_SECOND) if statement.records_per_second > 0 write_filter_exp(policy.filter_exp, exp_size) # Write socket idle timeout. write_field_int(policy.socket_timeout, FieldType::SOCKET_TIMEOUT) # Write task_id field write_field_int64(statement.task_id, FieldType::TRAN_ID) if filter type = filter.collection_type if type > 0 write_field_header(1, FieldType::INDEX_TYPE) @data_offset += @data_buffer.write_byte(type, @data_offset) end write_field_header(filter_size, FieldType::INDEX_RANGE) @data_offset += @data_buffer.write_byte(1, @data_offset) @data_offset = filter.write(@data_buffer, @data_offset) if packed_ctx write_field_header(packed_ctx.length, FieldType::INDEX_CONTEXT) @data_offset += @data_buffer.write_binary(packed_ctx, @data_offset) end end if statement.function_name write_field_header(1, FieldType::UDF_OP) ret_marker = statement.return_data ? 1 : 2 @data_offset += @data_buffer.write_byte(ret_marker, @data_offset) write_field_string(statement.package_name, FieldType::UDF_PACKAGE_NAME) write_field_string(statement.function_name, FieldType::UDF_FUNCTION) write_field_string(function_arg_buffer, FieldType::UDF_ARGLIST) end if parts_full_size > 0 write_field_header(parts_full_size, FieldType::PID_ARRAY) node_partitions.parts_full.each do |part| @data_offset += @data_buffer.write_uint16_little_endian(part.id, @data_offset) end end if parts_partial_digest_size > 0 write_field_header(parts_partial_digest_size, FieldType::DIGEST_ARRAY) node_partitions.parts_partial.each do |part| @data_offset += @data_buffer.write_binary(part.digest, @data_offset) end end if parts_partial_bval_size > 0 write_field_header(parts_partial_bval_size, FieldType::BVAL_ARRAY) @node_partitions.parts_partial.each do |part| @data_offset += @data_buffer.write_uint64_little_endian(part.bval, @data_offset) end end if max_records > 0 write_field(max_records, FieldType::MAX_RECORDS) end if !operations.nil? operations.each do |operation| write_operation_for_operation(operation) end elsif !bin_names.nil? && (is_new || filter.nil?) bin_names.each do |bin_name| write_operation_for_bin_name(bin_name, Operation::READ) end end end_cmd end |
#set_read(policy, key, bin_names) ⇒ Object
Writes the command for get operations (specified bins)
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/aerospike/command/command.rb', line 206 def set_read(policy, key, bin_names) if bin_names && !bin_names.empty? begin_cmd field_count = estimate_key_size(key) exp_size = estimate_expression_size(@policy.filter_exp) field_count += 1 if exp_size > 0 bin_names.each do |bin_name| estimate_operation_size_for_bin_name(bin_name) end size_buffer attr = INFO1_READ if bin_names.empty? attr |= INFO1_GET_ALL end write_header_read(policy, attr, 0, 0, field_count, bin_names.length) write_key(key) write_filter_exp(@policy.filter_exp, exp_size) bin_names.each do |bin_name| write_operation_for_bin_name(bin_name, Aerospike::Operation::READ) end end_cmd mark_compressed(policy) else set_read_for_key_only(policy, key) end end |
#set_read_for_key_only(policy, key) ⇒ Object
Writes the command for get operations (all bins)
191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/aerospike/command/command.rb', line 191 def set_read_for_key_only(policy, key) begin_cmd field_count = estimate_key_size(key) exp_size = estimate_expression_size(@policy.filter_exp) field_count += 1 if exp_size > 0 size_buffer write_header_read(policy, INFO1_READ | INFO1_GET_ALL, 0, 0, field_count, 0) write_key(key) write_filter_exp(@policy.filter_exp, exp_size) end_cmd end |
#set_read_header(policy, key) ⇒ Object
Writes the command for getting metadata operations
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/aerospike/command/command.rb', line 240 def set_read_header(policy, key) begin_cmd field_count = estimate_key_size(key) exp_size = estimate_expression_size(@policy.filter_exp) field_count += 1 if exp_size > 0 size_buffer # The server does not currently return record header data with _INFO1_NOBINDATA attribute set. # The workaround is to request a non-existent bin. # TODO: Fix this on server. #command.set_read(INFO1_READ | _INFO1_NOBINDATA); write_header_read_header(policy, INFO1_READ|INFO1_NOBINDATA, field_count, 0) write_key(key) write_filter_exp(@policy.filter_exp, exp_size) end_cmd mark_compressed(policy) end |
#set_scan(cluster, policy, namespace, set_name, bin_names, node_partitions) ⇒ Object
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 |
# File 'lib/aerospike/command/command.rb', line 308 def set_scan(cluster, policy, namespace, set_name, bin_names, node_partitions) # Estimate buffer size begin_cmd field_count = 0 parts_full_size = node_partitions.parts_full.length * 2 parts_partial_size = node_partitions.parts_partial.length * 20 max_records = node_partitions.record_max if namespace @data_offset += namespace.bytesize + FIELD_HEADER_SIZE field_count += 1 end if set_name @data_offset += set_name.bytesize + FIELD_HEADER_SIZE field_count += 1 end if parts_full_size > 0 @data_offset += parts_full_size + FIELD_HEADER_SIZE field_count += 1 end if parts_partial_size > 0 @data_offset += parts_partial_size + FIELD_HEADER_SIZE field_count += 1 end if max_records > 0 @data_offset += 8 + FIELD_HEADER_SIZE field_count += 1 end if policy.records_per_second > 0 @data_offset += 4 + FIELD_HEADER_SIZE field_count += 1 end exp_size = estimate_expression_size(@policy.filter_exp) field_count += 1 if exp_size > 0 # Estimate scan options size. # @data_offset += 2 + FIELD_HEADER_SIZE # field_count += 1 # Estimate scan timeout size. @data_offset += 4 + FIELD_HEADER_SIZE field_count += 1 if bin_names bin_names.each do |bin_name| estimate_operation_size_for_bin_name(bin_name) end end size_buffer read_attr = INFO1_READ unless policy.include_bin_data read_attr |= INFO1_NOBINDATA end info_attr = 0 if cluster.supports_feature?(Aerospike::Features::PARTITION_QUERY) info_attr = INFO3_PARTITION_DONE end operation_count = 0 unless bin_names.nil? operation_count = bin_names.length end write_header_read(policy, read_attr, 0, info_attr, field_count, operation_count) if namespace write_field_string(namespace, Aerospike::FieldType::NAMESPACE) end if set_name write_field_string(set_name, Aerospike::FieldType::TABLE) end if parts_full_size > 0 write_field_header(parts_full_size, Aerospike::FieldType::PID_ARRAY) node_partitions.parts_full.each do |part| @data_buffer.write_uint16_little_endian(part.id, @data_offset) @data_offset += 2 end end if parts_partial_size > 0 write_field_header(parts_partial_size, Aerospike::FieldType::DIGEST_ARRAY) node_partitions.parts_partial.each do |part| @data_buffer.write_binary(part.digest, @data_offset) @data_offset += part.digest.length end end if max_records > 0 write_field_int64(max_records, Aerospike::FieldType::MAX_RECORDS) end if policy.records_per_second > 0 write_field_int(policy.records_per_second, Aerospike::FieldType::RECORDS_PER_SECOND) end write_filter_exp(@policy.filter_exp, exp_size) # write_field_header(2, Aerospike::FieldType::SCAN_OPTIONS) # priority = policy.priority & 0xFF # priority <<= 4 # if policy.fail_on_cluster_change # priority |= 0x08 # end # @data_buffer.write_byte(priority, @data_offset) # @data_offset += 1 # @data_buffer.write_byte(policy.scan_percent.to_i.ord, @data_offset) # @data_offset += 1 write_field_header(4, Aerospike::FieldType::SOCKET_TIMEOUT) @data_buffer.write_uint32(policy.socket_timeout.to_i, @data_offset) @data_offset += 4 if bin_names bin_names.each do |bin_name| write_operation_for_bin_name(bin_name, Aerospike::Operation::READ) end end end_cmd end |
#set_touch(policy, key) ⇒ Object
Writes the command for touch operations
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/aerospike/command/command.rb', line 159 def set_touch(policy, key) begin_cmd field_count = estimate_key_size(key) exp_size = estimate_expression_size(@policy.filter_exp) field_count += 1 if exp_size > 0 estimate_operation_size size_buffer write_header_write(policy, INFO2_WRITE, field_count, 1) write_key(key) write_filter_exp(@policy.filter_exp, exp_size) write_operation_for_operation_type(Aerospike::Operation::TOUCH) end_cmd end |
#set_udf(policy, key, package_name, function_name, args) ⇒ Object
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/aerospike/command/command.rb', line 285 def set_udf(policy, key, package_name, function_name, args) begin_cmd field_count = estimate_key_size(key, policy) exp_size = estimate_expression_size(@policy.filter_exp) field_count += 1 if exp_size > 0 arg_bytes = args.to_bytes field_count += estimate_udf_size(package_name, function_name, arg_bytes) size_buffer write_header_write(policy, INFO2_WRITE, field_count, 0) write_key(key, policy) write_filter_exp(@policy.filter_exp, exp_size) write_field_string(package_name, Aerospike::FieldType::UDF_PACKAGE_NAME) write_field_string(function_name, Aerospike::FieldType::UDF_FUNCTION) write_field_bytes(arg_bytes, Aerospike::FieldType::UDF_ARGLIST) end_cmd mark_compressed(policy) end |
#set_write(policy, operation, key, bins) ⇒ Object
Writes the command for write operations
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/aerospike/command/command.rb', line 118 def set_write(policy, operation, key, bins) begin_cmd field_count = estimate_key_size(key, policy) exp_size = estimate_expression_size(@policy.filter_exp) field_count += 1 if exp_size > 0 bins.each do |bin| estimate_operation_size_for_bin(bin) end size_buffer write_header_write(policy, INFO2_WRITE, field_count, bins.length) write_key(key, policy) write_filter_exp(@policy.filter_exp, exp_size) bins.each do |bin| write_operation_for_bin(bin, operation) end end_cmd mark_compressed(policy) end |
#write_bins ⇒ Object
List of all bins that this command will write to - sub-classes should override this as appropriate.
113 114 115 |
# File 'lib/aerospike/command/command.rb', line 113 def write_bins [] end |