Class: Aerospike::QueryExecutor
- Inherits:
-
Object
- Object
- Aerospike::QueryExecutor
- Defined in:
- lib/aerospike/query/query_executor.rb
Overview
:nodoc:
Class Method Summary collapse
Class Method Details
permalink .query_partitions(cluster, policy, tracker, statement, recordset) ⇒ Object
[View source]
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/aerospike/query/query_executor.rb', line 20 def self.query_partitions(cluster, policy, tracker, statement, recordset) interval = policy.sleep_between_retries should_retry = false loop do # reset last_expn @last_expn = nil list = tracker.assign_partitions_to_nodes(cluster, statement.namespace) if policy.concurrent_nodes threads = [] # Use a thread per node list.each do |node_partition| threads << Thread.new do Thread.current.abort_on_exception = true command = QueryPartitionCommand.new(cluster, node_partition.node, tracker, policy, statement, recordset, node_partition) begin command.execute rescue => e @last_expn = e unless e == QUERY_TERMINATED_EXCEPTION should_retry ||= command.should_retry(e) Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION end end end threads.each(&:join) else # Use a single thread for all nodes for all node list.each do |node_partition| command = QueryPartitionCommand.new(cluster, node_partition.node, tracker, policy, statement, recordset, node_partition) begin command.execute rescue => e @last_expn = e unless e == QUERY_TERMINATED_EXCEPTION should_retry ||= command.should_retry(e) Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION end end end if tracker.complete?(@cluster, policy) || !should_retry recordset.thread_finished(@last_expn) return end sleep(interval) if policy.sleep_between_retries > 0 statement.reset_task_id end end |