20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/aerospike/query/query_executor.rb', line 20
def self.query_partitions(cluster, policy, tracker, statement, recordset)
interval = policy.sleep_between_retries
should_retry = false
loop do
@last_expn = nil
list = tracker.assign_partitions_to_nodes(cluster, statement.namespace)
if policy.concurrent_nodes
threads = []
list.each do |node_partition|
threads << Thread.new do
Thread.current.abort_on_exception = true
command = QueryPartitionCommand.new(cluster, node_partition.node, tracker, policy, statement, recordset, node_partition)
begin
command.execute
rescue => e
@last_expn = e unless e == QUERY_TERMINATED_EXCEPTION
should_retry ||= command.should_retry(e)
Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION
end
end
end
threads.each(&:join)
else
list.each do |node_partition|
command = QueryPartitionCommand.new(cluster, node_partition.node, tracker, policy, statement, recordset, node_partition)
begin
command.execute
rescue => e
@last_expn = e unless e == QUERY_TERMINATED_EXCEPTION
should_retry ||= command.should_retry(e)
Aerospike.logger.error(e.backtrace.join("\n")) unless e == QUERY_TERMINATED_EXCEPTION
end
end
end
if tracker.complete?(@cluster, policy) || !should_retry
recordset.thread_finished(@last_expn)
return
end
sleep(interval) if policy.sleep_between_retries > 0
statement.reset_task_id
end
end
|