Class: Aerospike::Client
- Inherits:
-
Object
- Object
- Aerospike::Client
- Defined in:
- lib/aerospike/client.rb
Overview
Examples:
# connect to the database client = Client.new(‘192.168.0.1’)
#=> raises Aerospike::Exceptions::Timeout if a :timeout
is specified and :fail_if_not_connected
set to true
Instance Attribute Summary collapse
-
#cluster ⇒ Object
readonly
Returns the value of attribute cluster.
-
#default_admin_policy ⇒ Object
Returns the value of attribute default_admin_policy.
-
#default_batch_policy ⇒ Object
Returns the value of attribute default_batch_policy.
-
#default_info_policy ⇒ Object
Returns the value of attribute default_info_policy.
-
#default_operate_policy ⇒ Object
Returns the value of attribute default_operate_policy.
-
#default_query_policy ⇒ Object
Returns the value of attribute default_query_policy.
-
#default_read_policy ⇒ Object
Returns the value of attribute default_read_policy.
-
#default_scan_policy ⇒ Object
Returns the value of attribute default_scan_policy.
-
#default_write_policy ⇒ Object
Returns the value of attribute default_write_policy.
Instance Method Summary collapse
-
#add(key, bins, options = nil) ⇒ Object
Examples:.
-
#append(key, bins, options = nil) ⇒ Object
Examples:.
-
#batch_exists(keys, options = nil) ⇒ Object
Check if multiple record keys exist in one batch call.
-
#batch_get(keys, bin_names = nil, options = nil) ⇒ Object
Read multiple record headers and bins for specified keys in one batch call.
-
#batch_get_header(keys, options = nil) ⇒ Object
Read multiple record header data for specified keys in one batch call.
-
#batch_operate(records, options = nil) ⇒ Object
Operate on multiple records for specified batch keys in one batch call.
-
#change_password(user, password, options = nil) ⇒ Object
Change user’s password.
-
#close ⇒ Object
Closes all client connections to database server nodes.
-
#connect ⇒ Object
Connect to the cluster.
-
#connected? ⇒ Boolean
Determines if there are active connections to the database server cluster.
-
#create_index(namespace, set_name, index_name, bin_name, index_type, collection_type = nil, options = nil, ctx: nil) ⇒ Object
Create secondary index.
-
#create_role(role_name, privileges = [], allowlist = [], read_quota = 0, write_quota = 0, options = nil) ⇒ Object
Create a user-defined role.
-
#create_user(user, password, roles, options = nil) ⇒ Object
Create user with password and roles.
-
#delete(key, options = nil) ⇒ Object
Examples:.
-
#drop_index(namespace, set_name, index_name, options = nil) ⇒ Object
Delete secondary index.
-
#drop_role(role_name, options = nil) ⇒ Object
Remove a user-defined role.
-
#drop_user(user, options = nil) ⇒ Object
Remove user from cluster.
-
#execute_udf(key, package_name, function_name, args = [], options = nil) ⇒ Object
Execute user defined function on server and return results.
-
#execute_udf_on_query(statement, package_name, function_name, function_args = [], options = nil) ⇒ Object
execute_udf_on_query applies user defined function on records that match the statement filter.
-
#exists(key, options = nil) ⇒ Object
Determines if a record key exists.
-
#get(key, bin_names = nil, options = nil) ⇒ Object
Read record header and bins for specified key.
-
#get_header(key, options = nil) ⇒ Object
Read record generation and expiration only for specified key.
-
#grant_privileges(role_name, privileges, options = nil) ⇒ Object
Grant privileges to a user-defined role.
-
#grant_roles(user, roles, options = nil) ⇒ Object
Add roles to user’s list of roles.
-
#initialize(hosts = nil, policy: ClientPolicy.new, connect: true) ⇒ Client
constructor
A new instance of Client.
-
#list_udf(options = nil) ⇒ Object
ListUDF lists all packages containing user defined functions in the server.
-
#node_names ⇒ Object
Returns list of active server node names in the cluster.
-
#nodes ⇒ Object
Returns array of active server nodes in the cluster.
-
#operate(key, operations, options = nil) ⇒ Object
Perform multiple read/write operations on a single key in one batch call.
-
#prepend(key, bins, options = nil) ⇒ Object
Examples:.
-
#put(key, bins, options = nil) ⇒ Object
Examples:.
-
#query(statement, options = nil) ⇒ Object
Query executes a query and returns a recordset.
-
#query_execute(statement, operations = [], options = nil) ⇒ Aerospike::ExecuteTask
QueryExecute applies operations on records that match the statement filter.
-
#query_partitions(partition_filter, statement, options = nil) ⇒ Object
Executes a query for specified partitions and returns a recordset.
-
#query_role(role, options = nil) ⇒ Object
Retrieve privileges for a given role.
-
#query_roles(options = nil) ⇒ Object
Retrieve all roles and their privileges.
-
#query_user(user, options = nil) ⇒ Object
Retrieve roles for a given user.
-
#query_users(options = nil) ⇒ Object
Retrieve all users and their roles.
-
#register_udf(udf_body, server_path, language, options = nil) ⇒ Object
Register package containing user defined functions with server.
-
#register_udf_from_file(client_path, server_path, language, options = nil) ⇒ Object
Register package containing user defined functions with server.
-
#remove_udf(udf_name, options = nil) ⇒ Object
RemoveUDF removes a package containing user defined functions in the server.
- #request_info(*commands, policy: nil) ⇒ Object
-
#revoke_privileges(role_name, privileges, options = nil) ⇒ Object
Revoke privileges from a user-defined role.
-
#revoke_roles(user, roles, options = nil) ⇒ Object
Remove roles from user’s list of roles.
-
#scan_all(namespace, set_name, bin_names = nil, options = nil) ⇒ Object
Reads all records in specified namespace and set from all nodes.
-
#scan_node(node, namespace, set_name, bin_names = nil, options = nil) ⇒ Object
ScanNode reads all records in specified namespace and set, from one node only.
-
#scan_node_partitions(node, namespace, set_name, bin_names = nil, options = nil) ⇒ Object
Reads all records in specified namespace and set for one node only.
-
#scan_partitions(partition_filter, namespace, set_name, bin_names = nil, options = nil) ⇒ Object
Reads records in specified namespace and set using partition filter.
-
#set_quotas(role_name, read_quota, write_quota, options = nil) ⇒ Object
Set or update quota for a role.
- #supports_feature?(feature) ⇒ Boolean
-
#touch(key, options = nil) ⇒ Object
Examples:.
-
#truncate(namespace, set_name = nil, before_last_update = nil, options = {}) ⇒ Object
Removes records in the specified namespace/set efficiently.
Constructor Details
#initialize(hosts = nil, policy: ClientPolicy.new, connect: true) ⇒ Client
Returns a new instance of Client.
41 42 43 44 45 46 47 48 49 50 |
# File 'lib/aerospike/client.rb', line 41 def initialize(hosts = nil, policy: ClientPolicy.new, connect: true) hosts = ::Aerospike::Host::Parse.(hosts || ENV["AEROSPIKE_HOSTS"] || "localhost") policy = create_policy(policy, ClientPolicy) set_default_policies(policy.policies) @cluster = Cluster.new(policy, hosts) @cluster.add_cluster_config_change_listener(self) self.connect if connect self end |
Instance Attribute Details
#cluster ⇒ Object
Returns the value of attribute cluster.
39 40 41 |
# File 'lib/aerospike/client.rb', line 39 def cluster @cluster end |
#default_admin_policy ⇒ Object
Returns the value of attribute default_admin_policy.
39 40 41 |
# File 'lib/aerospike/client.rb', line 39 def default_admin_policy @default_admin_policy end |
#default_batch_policy ⇒ Object
Returns the value of attribute default_batch_policy.
39 40 41 |
# File 'lib/aerospike/client.rb', line 39 def default_batch_policy @default_batch_policy end |
#default_info_policy ⇒ Object
Returns the value of attribute default_info_policy.
39 40 41 |
# File 'lib/aerospike/client.rb', line 39 def default_info_policy @default_info_policy end |
#default_operate_policy ⇒ Object
Returns the value of attribute default_operate_policy.
39 40 41 |
# File 'lib/aerospike/client.rb', line 39 def default_operate_policy @default_operate_policy end |
#default_query_policy ⇒ Object
Returns the value of attribute default_query_policy.
39 40 41 |
# File 'lib/aerospike/client.rb', line 39 def default_query_policy @default_query_policy end |
#default_read_policy ⇒ Object
Returns the value of attribute default_read_policy.
39 40 41 |
# File 'lib/aerospike/client.rb', line 39 def default_read_policy @default_read_policy end |
#default_scan_policy ⇒ Object
Returns the value of attribute default_scan_policy.
39 40 41 |
# File 'lib/aerospike/client.rb', line 39 def default_scan_policy @default_scan_policy end |
#default_write_policy ⇒ Object
Returns the value of attribute default_write_policy.
39 40 41 |
# File 'lib/aerospike/client.rb', line 39 def default_write_policy @default_write_policy end |
Instance Method Details
#add(key, bins, options = nil) ⇒ Object
Examples:
client.add key, {'bin', -1}, :timeout => 0.001
172 173 174 175 176 |
# File 'lib/aerospike/client.rb', line 172 def add(key, bins, = nil) policy = create_policy(, WritePolicy, default_write_policy) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::ADD) execute_command(command) end |
#append(key, bins, options = nil) ⇒ Object
Examples:
client.append key, {'bin', 'value to append'}, :timeout => 0.001
130 131 132 133 134 |
# File 'lib/aerospike/client.rb', line 130 def append(key, bins, = nil) policy = create_policy(, WritePolicy, default_write_policy) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::APPEND) execute_command(command) end |
#batch_exists(keys, options = nil) ⇒ Object
Check if multiple record keys exist in one batch call.
The returned boolean array is in positional order with the original key array order.
The policy can be used to specify timeouts and protocol type.
357 358 359 360 361 362 363 364 365 366 |
# File 'lib/aerospike/client.rb', line 357 def batch_exists(keys, = nil) policy = create_policy(, BatchPolicy, default_batch_policy) results = Array.new(keys.length) execute_batch_index_commands(policy, keys) do |node, batch| BatchIndexExistsCommand.new(node, batch, policy, results) end results end |
#batch_get(keys, bin_names = nil, options = nil) ⇒ Object
Read multiple record headers and bins for specified keys in one batch call.
The returned records are in positional order with the original key array order.
If a key is not found, the positional record will be nil.
The policy can be used to specify timeouts and protocol type.
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/aerospike/client.rb', line 310 def batch_get(keys, bin_names = nil, = nil) policy = create_policy(, BatchPolicy, default_batch_policy) results = Array.new(keys.length) info_flags = INFO1_READ case bin_names when :all, nil, [] info_flags |= INFO1_GET_ALL bin_names = nil when :none info_flags |= INFO1_NOBINDATA bin_names = nil end execute_batch_index_commands(policy, keys) do |node, batch| BatchIndexCommand.new(node, batch, policy, bin_names, results, info_flags) end results end |
#batch_get_header(keys, options = nil) ⇒ Object
Read multiple record header data for specified keys in one batch call.
The returned records are in positional order with the original key array order.
If a key is not found, the positional record will be nil.
The policy can be used to specify timeouts and protocol type.
335 336 337 |
# File 'lib/aerospike/client.rb', line 335 def batch_get_header(keys, = nil) batch_get(keys, :none, ) end |
#batch_operate(records, options = nil) ⇒ Object
Operate on multiple records for specified batch keys in one batch call. This method allows different namespaces/bins for each key in the batch. The returned records are located in the same list.
records can be BatchRead, BatchWrite, BatchDelete or BatchUDF.
Requires server version 6.0+
346 347 348 349 350 351 352 |
# File 'lib/aerospike/client.rb', line 346 def batch_operate(records, = nil) policy = create_policy(, BatchPolicy, default_batch_policy) execute_batch_operate_commands(policy, records) do |node, batch| BatchOperateCommand.new(node, batch, policy, records) end end |
#change_password(user, password, options = nil) ⇒ Object
Change user’s password. Clear-text password will be hashed using bcrypt before sending to server.
798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 |
# File 'lib/aerospike/client.rb', line 798 def change_password(user, password, = nil) raise Aerospike::Exceptions::Aerospike.new(INVALID_USER) unless @cluster.user && @cluster.user != "" policy = create_policy(, AdminPolicy, default_admin_policy) hash = LoginCommand.hash_password(password) command = AdminCommand.new if user == @cluster.user # Change own password. command.change_password(@cluster, policy, user, hash) else # Change other user's password by user admin. command.set_password(@cluster, policy, user, hash) end @cluster.change_password(user, hash) end |
#close ⇒ Object
Closes all client connections to database server nodes.
62 63 64 |
# File 'lib/aerospike/client.rb', line 62 def close @cluster.close end |
#connect ⇒ Object
Connect to the cluster.
55 56 57 |
# File 'lib/aerospike/client.rb', line 55 def connect @cluster.connect end |
#connected? ⇒ Boolean
Determines if there are active connections to the database server cluster.
Returns +true+ if connections exist.
70 71 72 |
# File 'lib/aerospike/client.rb', line 70 def connected? @cluster.connected? end |
#create_index(namespace, set_name, index_name, bin_name, index_type, collection_type = nil, options = nil, ctx: nil) ⇒ Object
Create secondary index.
This asynchronous server call will return before command is complete.
The user can optionally wait for command completion by using the returned
IndexTask instance.
This method is only supported by Aerospike 3 servers.
index_type should be :string, :numeric or :geo2dsphere (requires server version 3.7 or later)
collection_type should be :list, :mapkeys or :mapvalues
ctx is an optional list of context. Supported on server v6.1+.
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 |
# File 'lib/aerospike/client.rb', line 560 def create_index(namespace, set_name, index_name, bin_name, index_type, collection_type = nil, = nil, ctx: nil) if .nil? && collection_type.is_a?(Hash) = collection_type collection_type = nil end policy = create_policy(, Policy, default_info_policy) str_cmd = "sindex-create:ns=#{namespace}" str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty? str_cmd << ";indexname=#{index_name};numbins=1" str_cmd << ";context=#{CDT::Context.base64(ctx)}" unless ctx.to_a.empty? str_cmd << ";indextype=#{collection_type.to_s.upcase}" if collection_type str_cmd << ";indexdata=#{bin_name},#{index_type.to_s.upcase}" str_cmd << ";priority=normal" # Send index command to one node. That node will distribute the command to other nodes. response = send_info_command(policy, str_cmd).upcase if response == "OK" # Return task that could optionally be polled for completion. return IndexTask.new(@cluster, namespace, index_name) end if response.start_with?("FAIL:200") # Index has already been created. Do not need to poll for completion. return IndexTask.new(@cluster, namespace, index_name, true) end raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INDEX_GENERIC, "Create index failed: #{response}") end |
#create_role(role_name, privileges = [], allowlist = [], read_quota = 0, write_quota = 0, options = nil) ⇒ Object
Create a user-defined role. Quotas require server security configuration “enable-quotas” to be set to true. Pass 0 for quota values for no limit.
861 862 863 864 865 |
# File 'lib/aerospike/client.rb', line 861 def create_role(role_name, privileges = [], allowlist = [], read_quota = 0, write_quota = 0, = nil) policy = create_policy(, AdminPolicy, default_admin_policy) command = AdminCommand.new command.create_role(@cluster, policy, role_name, privileges, allowlist, read_quota, write_quota) end |
#create_user(user, password, roles, options = nil) ⇒ Object
Create user with password and roles. Clear-text password will be hashed using bcrypt before sending to server.
783 784 785 786 787 788 |
# File 'lib/aerospike/client.rb', line 783 def create_user(user, password, roles, = nil) policy = create_policy(, AdminPolicy, default_admin_policy) hash = LoginCommand.hash_password(password) command = AdminCommand.new command.create_user(@cluster, policy, user, hash, roles) end |
#delete(key, options = nil) ⇒ Object
Examples:
existed = client.delete key, :timeout => 0.001
194 195 196 197 198 199 |
# File 'lib/aerospike/client.rb', line 194 def delete(key, = nil) policy = create_policy(, WritePolicy, default_write_policy) command = DeleteCommand.new(@cluster, policy, key) execute_command(command) command.existed end |
#drop_index(namespace, set_name, index_name, options = nil) ⇒ Object
Delete secondary index.
This method is only supported by Aerospike 3 servers.
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 |
# File 'lib/aerospike/client.rb', line 592 def drop_index(namespace, set_name, index_name, = nil) policy = create_policy(, Policy, default_info_policy) str_cmd = "sindex-delete:ns=#{namespace}" str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty? str_cmd << ";indexname=#{index_name}" # Send index command to one node. That node will distribute the command to other nodes. response = send_info_command(policy, str_cmd).upcase return if response == "OK" # Index did not previously exist. Return without error. return if response.start_with?("FAIL:201") raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INDEX_GENERIC, "Drop index failed: #{response}") end |
#drop_role(role_name, options = nil) ⇒ Object
Remove a user-defined role.
868 869 870 871 872 |
# File 'lib/aerospike/client.rb', line 868 def drop_role(role_name, = nil) policy = create_policy(, AdminPolicy, default_admin_policy) command = AdminCommand.new command.drop_role(@cluster, policy, role_name) end |
#drop_user(user, options = nil) ⇒ Object
Remove user from cluster.
791 792 793 794 795 |
# File 'lib/aerospike/client.rb', line 791 def drop_user(user, = nil) policy = create_policy(, AdminPolicy, default_admin_policy) command = AdminCommand.new command.drop_user(@cluster, policy, user) end |
#execute_udf(key, package_name, function_name, args = [], options = nil) ⇒ Object
Execute user defined function on server and return results.
The function operates on a single record.
The package name is used to locate the udf file location:
udf file = <server udf dir>/<package name>.lua
This method is only supported by Aerospike 3 servers.
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 |
# File 'lib/aerospike/client.rb', line 495 def execute_udf(key, package_name, function_name, args = [], = nil) policy = create_policy(, WritePolicy, default_write_policy) command = ExecuteCommand.new(@cluster, policy, key, package_name, function_name, args) execute_command(command) record = command.record return nil if !record || record.bins.empty? result_map = record.bins # User defined functions don't have to return a value. key, obj = result_map.detect { |k, _| k.include?("SUCCESS") } return obj if key key, obj = result_map.detect { |k, _| k.include?("FAILURE") } = key ? obj.to_s : "Invalid UDF return value" raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::UDF_BAD_RESPONSE, ) end |
#execute_udf_on_query(statement, package_name, function_name, function_args = [], options = nil) ⇒ Object
execute_udf_on_query applies user defined function on records that match the statement filter. Records are not returned to the client. This asynchronous server call will return before command is complete. The user can optionally wait for command completion by using the returned ExecuteTask instance.
This method is only supported by Aerospike 3 servers. If the policy is nil, the default relevant policy will be used.
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 |
# File 'lib/aerospike/client.rb', line 524 def execute_udf_on_query(statement, package_name, function_name, function_args = [], = nil) policy = create_policy(, WritePolicy, default_write_policy) nodes = @cluster.nodes if nodes.empty? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Executing UDF failed because cluster is empty.") end statement = statement.clone statement.set_aggregate_function(package_name, function_name, function_args, false) # Use a thread per node nodes.each do |node| Thread.new do Thread.current.abort_on_exception = true begin command = ServerCommand.new(@cluster, node, policy, statement, true, statement.task_id) execute_command(command) rescue => e Aerospike.logger.error(e) raise e end end end ExecuteTask.new(@cluster, statement) end |
#exists(key, options = nil) ⇒ Object
Determines if a record key exists.
The policy can be used to specify timeouts.
272 273 274 275 276 277 |
# File 'lib/aerospike/client.rb', line 272 def exists(key, = nil) policy = create_policy(, Policy, default_read_policy) command = ExistsCommand.new(@cluster, policy, key) execute_command(command) command.exists end |
#get(key, bin_names = nil, options = nil) ⇒ Object
Read record header and bins for specified key.
The policy can be used to specify timeouts.
285 286 287 288 289 290 291 |
# File 'lib/aerospike/client.rb', line 285 def get(key, bin_names = nil, = nil) policy = create_policy(, Policy, default_read_policy) command = ReadCommand.new(@cluster, policy, key, bin_names) execute_command(command) command.record end |
#get_header(key, options = nil) ⇒ Object
Read record generation and expiration only for specified key. Bins are not read.
The policy can be used to specify timeouts.
295 296 297 298 299 300 |
# File 'lib/aerospike/client.rb', line 295 def get_header(key, = nil) policy = create_policy(, Policy, default_read_policy) command = ReadHeaderCommand.new(@cluster, policy, key) execute_command(command) command.record end |
#grant_privileges(role_name, privileges, options = nil) ⇒ Object
Grant privileges to a user-defined role.
875 876 877 878 879 |
# File 'lib/aerospike/client.rb', line 875 def grant_privileges(role_name, privileges, = nil) policy = create_policy(, AdminPolicy, default_admin_policy) command = AdminCommand.new command.grant_privileges(@cluster, policy, role_name, privileges) end |
#grant_roles(user, roles, options = nil) ⇒ Object
Add roles to user’s list of roles.
817 818 819 820 821 |
# File 'lib/aerospike/client.rb', line 817 def grant_roles(user, roles, = nil) policy = create_policy(, AdminPolicy, default_admin_policy) command = AdminCommand.new command.grant_roles(@cluster, policy, user, roles) end |
#list_udf(options = nil) ⇒ Object
ListUDF lists all packages containing user defined functions in the server.
This method is only supported by Aerospike 3 servers.
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 |
# File 'lib/aerospike/client.rb', line 457 def list_udf( = nil) policy = create_policy(, Policy, default_info_policy) str_cmd = "udf-list" # Send command to one node. That node will distribute it to other nodes. response_map = @cluster.request_info(policy, str_cmd) _, response = response_map.first vals = response.split(";") vals.map do |udf_info| next if udf_info.strip! == "" udf_parts = udf_info.split(",") udf = UDF.new udf_parts.each do |values| k, v = values.split("=", 2) case k when "filename" udf.filename = v when "hash" udf.hash = v when "type" udf.language = v end end udf end end |
#node_names ⇒ Object
Returns list of active server node names in the cluster.
84 85 86 |
# File 'lib/aerospike/client.rb', line 84 def node_names @cluster.nodes.map(&:name) end |
#nodes ⇒ Object
Returns array of active server nodes in the cluster.
77 78 79 |
# File 'lib/aerospike/client.rb', line 77 def nodes @cluster.nodes end |
#operate(key, operations, options = nil) ⇒ Object
Perform multiple read/write operations on a single key in one batch call.
An example would be to add an integer value to an existing record and then
read the result, all in one database call. Operations are executed in
the order they are specified.
376 377 378 379 380 381 382 383 |
# File 'lib/aerospike/client.rb', line 376 def operate(key, operations, = nil) policy = create_policy(, OperatePolicy, default_operate_policy) args = OperateArgs.new(cluster, policy, default_write_policy, default_operate_policy, key, operations) command = OperateCommand.new(@cluster, key, args) execute_command(command) command.record end |
#prepend(key, bins, options = nil) ⇒ Object
Examples:
client.prepend key, {'bin', 'value to prepend'}, :timeout => 0.001
149 150 151 152 153 |
# File 'lib/aerospike/client.rb', line 149 def prepend(key, bins, = nil) policy = create_policy(, WritePolicy, default_write_policy) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::PREPEND) execute_command(command) end |
#put(key, bins, options = nil) ⇒ Object
Examples:
client.put key, {'bin', 'value string'}, :timeout => 0.001
107 108 109 110 111 |
# File 'lib/aerospike/client.rb', line 107 def put(key, bins, = nil) policy = create_policy(, WritePolicy, default_write_policy) command = WriteCommand.new(@cluster, policy, key, hash_to_bins(bins), Aerospike::Operation::WRITE) execute_command(command) end |
#query(statement, options = nil) ⇒ Object
Query executes a query and returns a recordset. The query executor puts records on a channel from separate threads. The caller can concurrently pops records off the channel through the record channel.
This method is only supported by Aerospike 3 servers. If the policy is nil, a default policy will be generated.
720 721 722 |
# File 'lib/aerospike/client.rb', line 720 def query(statement, = nil) query_partitions(Aerospike::PartitionFilter.all, statement, ) end |
#query_execute(statement, operations = [], options = nil) ⇒ Aerospike::ExecuteTask
QueryExecute applies operations on records that match the statement filter. Records are not returned to the client. This asynchronous server call will return before the command is complete. The user can optionally wait for command completion by using the returned ExecuteTask instance.
This method is only supported by Aerospike 3+ servers. If the policy is nil, the default relevant policy will be used.
743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 |
# File 'lib/aerospike/client.rb', line 743 def query_execute(statement, operations = [], = nil) policy = create_policy(, WritePolicy, default_write_policy) if statement.nil? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::INVALID_COMMAND, "Query failed of invalid statement.") end statement = statement.clone unless operations.empty? statement.operations = operations end task_id = statement.task_id nodes = @cluster.nodes if nodes.empty? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Query failed because cluster is empty.") end # Use a thread per node nodes.each do |node| Thread.new do Thread.current.abort_on_exception = true begin command = ServerCommand.new(@cluster, node, policy, statement, true, task_id) execute_command(command) rescue => e Aerospike.logger.error(e) raise e end end end ExecuteTask.new(@cluster, statement) end |
#query_partitions(partition_filter, statement, options = nil) ⇒ Object
Executes a query for specified partitions and returns a recordset. The query executor puts records on the queue from separate threads. The caller can concurrently pop records off the queue through the recordset.records API.
This method is only supported by Aerospike 4.9+ servers. If the policy is nil, the default relevant policy will be used.
694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 |
# File 'lib/aerospike/client.rb', line 694 def query_partitions(partition_filter, statement, = nil) policy = create_policy(, QueryPolicy, default_query_policy) nodes = @cluster.nodes if nodes.empty? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Query failed because cluster is empty.") end # result recordset recordset = Recordset.new(policy.record_queue_size, 1, :query) tracker = PartitionTracker.new(policy, nodes, partition_filter) Thread.new do Thread.current.abort_on_exception = true QueryExecutor.query_partitions(@cluster, policy, tracker, statement, recordset) end recordset end |
#query_role(role, options = nil) ⇒ Object
Retrieve privileges for a given role.
845 846 847 848 849 |
# File 'lib/aerospike/client.rb', line 845 def query_role(role, = nil) policy = create_policy(, AdminPolicy, default_admin_policy) command = AdminCommand.new command.query_role(@cluster, policy, role) end |
#query_roles(options = nil) ⇒ Object
Retrieve all roles and their privileges.
852 853 854 855 856 |
# File 'lib/aerospike/client.rb', line 852 def query_roles( = nil) policy = create_policy(, AdminPolicy, default_admin_policy) command = AdminCommand.new command.query_roles(@cluster, policy) end |
#query_user(user, options = nil) ⇒ Object
Retrieve roles for a given user.
831 832 833 834 835 |
# File 'lib/aerospike/client.rb', line 831 def query_user(user, = nil) policy = create_policy(, AdminPolicy, default_admin_policy) command = AdminCommand.new command.query_user(@cluster, policy, user) end |
#query_users(options = nil) ⇒ Object
Retrieve all users and their roles.
838 839 840 841 842 |
# File 'lib/aerospike/client.rb', line 838 def query_users( = nil) policy = create_policy(, AdminPolicy, default_admin_policy) command = AdminCommand.new command.query_users(@cluster, policy) end |
#register_udf(udf_body, server_path, language, options = nil) ⇒ Object
Register package containing user defined functions with server.
This asynchronous server call will return before command is complete.
The user can optionally wait for command completion by using the returned
RegisterTask instance.
This method is only supported by Aerospike 3 servers.
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 |
# File 'lib/aerospike/client.rb', line 406 def register_udf(udf_body, server_path, language, = nil) policy = create_policy(, Policy, default_info_policy) content = Base64.strict_encode64(udf_body).force_encoding("binary") str_cmd = "udf-put:filename=#{server_path};content=#{content};" str_cmd << "content-len=#{content.length};udf-type=#{language};" # Send UDF to one node. That node will distribute the UDF to other nodes. response_map = @cluster.request_info(policy, str_cmd) res = {} response_map.each do |k, response| vals = response.to_s.split(";") vals.each do |pair| k, v = pair.split("=", 2) res[k] = v end end if res["error"] raise Aerospike::Exceptions::CommandRejected.new("Registration failed: #{res['error']}\nFile: #{res['file']}\nLine: #{res['line']}\nMessage: #{res['message']}") end UdfRegisterTask.new(@cluster, server_path) end |
#register_udf_from_file(client_path, server_path, language, options = nil) ⇒ Object
Register package containing user defined functions with server.
This asynchronous server call will return before command is complete.
The user can optionally wait for command completion by using the returned
RegisterTask instance.
This method is only supported by Aerospike 3 servers.
395 396 397 398 |
# File 'lib/aerospike/client.rb', line 395 def register_udf_from_file(client_path, server_path, language, = nil) udf_body = File.read(client_path) register_udf(udf_body, server_path, language, ) end |
#remove_udf(udf_name, options = nil) ⇒ Object
RemoveUDF removes a package containing user defined functions in the server.
This asynchronous server call will return before command is complete.
The user can optionally wait for command completion by using the returned
RemoveTask instance.
This method is only supported by Aerospike 3 servers.
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 |
# File 'lib/aerospike/client.rb', line 438 def remove_udf(udf_name, = nil) policy = create_policy(, Policy, default_info_policy) str_cmd = "udf-remove:filename=#{udf_name};" # Send command to one node. That node will distribute it to other nodes. # Send UDF to one node. That node will distribute the UDF to other nodes. response_map = @cluster.request_info(policy, str_cmd) _, response = response_map.first if response == "ok" UdfRemoveTask.new(@cluster, udf_name) else raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, response) end end |
#request_info(*commands, policy: nil) ⇒ Object
609 610 611 612 |
# File 'lib/aerospike/client.rb', line 609 def request_info(*commands, policy: nil) policy = create_policy(policy, Policy, default_info_policy) @cluster.request_info(policy, *commands) end |
#revoke_privileges(role_name, privileges, options = nil) ⇒ Object
Revoke privileges from a user-defined role.
882 883 884 885 886 |
# File 'lib/aerospike/client.rb', line 882 def revoke_privileges(role_name, privileges, = nil) policy = create_policy(, AdminPolicy, default_admin_policy) command = AdminCommand.new command.revoke_privileges(@cluster, policy, role_name, privileges) end |
#revoke_roles(user, roles, options = nil) ⇒ Object
Remove roles from user’s list of roles.
824 825 826 827 828 |
# File 'lib/aerospike/client.rb', line 824 def revoke_roles(user, roles, = nil) policy = create_policy(, AdminPolicy, default_admin_policy) command = AdminCommand.new command.revoke_roles(@cluster, policy, user, roles) end |
#scan_all(namespace, set_name, bin_names = nil, options = nil) ⇒ Object
Reads all records in specified namespace and set from all nodes. If the policy’s concurrent_nodes is specified, each server node will be read in parallel. Otherwise, server nodes are read sequentially. If the policy is nil, the default relevant policy will be used.
673 674 675 |
# File 'lib/aerospike/client.rb', line 673 def scan_all(namespace, set_name, bin_names = nil, = nil) scan_partitions(Aerospike::PartitionFilter.all, namespace, set_name, bin_names, ) end |
#scan_node(node, namespace, set_name, bin_names = nil, options = nil) ⇒ Object
ScanNode reads all records in specified namespace and set, from one node only. The policy can be used to specify timeouts.
679 680 681 |
# File 'lib/aerospike/client.rb', line 679 def scan_node(node, namespace, set_name, bin_names = nil, = nil) scan_node_partitions(node, namespace, set_name, bin_names, ) end |
#scan_node_partitions(node, namespace, set_name, bin_names = nil, options = nil) ⇒ Object
Reads all records in specified namespace and set for one node only. If the policy is nil, the default relevant policy will be used.
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 |
# File 'lib/aerospike/client.rb', line 648 def scan_node_partitions(node, namespace, set_name, bin_names = nil, = nil) policy = create_policy(, ScanPolicy, default_scan_policy) # Retry policy must be one-shot for scans. # copy on write for policy new_policy = policy.clone unless node.active? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.") end tracker = Aerospike::PartitionTracker.new(policy, [node]) recordset = Recordset.new(policy.record_queue_size, 1, :scan) Thread.new do Thread.current.abort_on_exception = true ScanExecutor.scan_partitions(policy, @cluster, tracker, namespace, set_name, recordset, bin_names) end recordset end |
#scan_partitions(partition_filter, namespace, set_name, bin_names = nil, options = nil) ⇒ Object
Reads records in specified namespace and set using partition filter. If the policy’s concurrent_nodes is specified, each server node will be read in parallel. Otherwise, server nodes are read sequentially. If partition_filter is nil, all partitions will be scanned. If the policy is nil, the default relevant policy will be used. This method is only supported by Aerospike 4.9+ servers.
624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 |
# File 'lib/aerospike/client.rb', line 624 def scan_partitions(partition_filter, namespace, set_name, bin_names = nil, = nil) policy = create_policy(, ScanPolicy, default_scan_policy) # Retry policy must be one-shot for scans. # copy on write for policy new_policy = policy.clone nodes = @cluster.nodes if nodes.empty? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.") end tracker = Aerospike::PartitionTracker.new(policy, nodes, partition_filter) recordset = Recordset.new(policy.record_queue_size, 1, :scan) Thread.new do Thread.current.abort_on_exception = true ScanExecutor.scan_partitions(policy, @cluster, tracker, namespace, set_name, recordset, bin_names) end recordset end |
#set_quotas(role_name, read_quota, write_quota, options = nil) ⇒ Object
Set or update quota for a role.
889 890 891 892 893 |
# File 'lib/aerospike/client.rb', line 889 def set_quotas(role_name, read_quota, write_quota, = nil) policy = create_policy(, AdminPolicy, default_admin_policy) command = AdminCommand.new command.set_quotas(@cluster, policy, role_name, read_quota, write_quota) end |
#supports_feature?(feature) ⇒ Boolean
88 89 90 |
# File 'lib/aerospike/client.rb', line 88 def supports_feature?(feature) @cluster.supports_feature?(feature) end |
#touch(key, options = nil) ⇒ Object
Examples:
client.touch key, :timeout => 0.001
259 260 261 262 263 |
# File 'lib/aerospike/client.rb', line 259 def touch(key, = nil) policy = create_policy(, WritePolicy, default_write_policy) command = TouchCommand.new(@cluster, policy, key) execute_command(command) end |
#truncate(namespace, set_name = nil, before_last_update = nil, options = {}) ⇒ Object
Removes records in the specified namespace/set efficiently.
This method is orders of magnitude faster than deleting records one at a time. It requires Aerospike Server version 3.12 or later. See www.aerospike.com/docs/reference/info#truncate for further information.
This asynchronous server call may return before the truncate is complete. The user can still write new records after the server call returns because new records will have last update times greater than the truncate cut-off (set at the time of the truncate call.)
If no policy options are provided, @default_info_policy will be used.
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/aerospike/client.rb', line 216 def truncate(namespace, set_name = nil, before_last_update = nil, = {}) policy = create_policy(, Policy, default_info_policy) node = @cluster.random_node if set_name && !set_name.to_s.strip.empty? str_cmd = "truncate:namespace=#{namespace}" str_cmd << ";set=#{set_name}" unless set_name.to_s.strip.empty? else str_cmd = if node.supports_feature?(Aerospike::Features::TRUNCATE_NAMESPACE) "truncate-namespace:namespace=#{namespace}" else "truncate:namespace=#{namespace}" end end if before_last_update lut_nanos = (before_last_update.to_f * 1_000_000_000.0).round str_cmd << ";lut=#{lut_nanos}" elsif supports_feature?(Aerospike::Features::LUT_NOW) # Servers >= 4.3.1.4 require lut argument str_cmd << ";lut=now" end response = send_info_command(policy, str_cmd, node).upcase return if response == "OK" raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_ERROR, "Truncate failed: #{response}") end |