Class: Aerospike::ScanExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/aerospike/query/scan_executor.rb

Overview

:nodoc:

Class Method Summary collapse

Class Method Details

.scan_partitions(policy, cluster, tracker, namespace, set_name, recordset, bin_names = nil) ⇒ Object

[View source]

20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/aerospike/query/scan_executor.rb', line 20

def self.scan_partitions(policy, cluster, tracker, namespace, set_name, recordset, bin_names = nil)
  interval = policy.sleep_between_retries

  should_retry = false

  loop do
    # reset last_expn
    @last_expn = nil

    list = tracker.assign_partitions_to_nodes(cluster, namespace)

    if policy.concurrent_nodes
      threads = []
      # Use a thread per node
      list.each do |node_partition|
        threads << Thread.new do
          Thread.current.abort_on_exception = true
          command = ScanPartitionCommand.new(cluster, policy, tracker, node_partition, namespace, set_name, bin_names, recordset)
          begin
            command.execute
          rescue => e
            @last_expn = e unless e == SCAN_TERMINATED_EXCEPTION
            should_retry ||= command.should_retry(e)
            Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION
          end
        end
      end
      threads.each(&:join)
    else
      # Use a single thread for all nodes for all node
      list.each do |node_partition|
        command = ScanPartitionCommand.new(cluster, policy, tracker, node_partition, namespace, set_name, bin_names, recordset)
        begin
          command.execute
        rescue => e
          @last_expn = e unless e == SCAN_TERMINATED_EXCEPTION
          should_retry ||= command.should_retry(e)
          Aerospike.logger.error(e.backtrace.join("\n")) unless e == SCAN_TERMINATED_EXCEPTION
        end
      end
    end

    if tracker.complete?(@cluster, policy) || !should_retry
      recordset.thread_finished(@last_expn)
      return
    end
    sleep(interval) if policy.sleep_between_retries > 0
    statement.reset_task_id
  end
end