Module: NSConnector::ChunkedSearching
- Included in:
- Resource
- Defined in:
- lib/ns_connector/chunked_searching.rb
Overview
Provide threaded and non-threaded chunked searching
Instance Method Summary collapse
-
#grab_chunk(filters, chunk) ⇒ Object
- Retrieve a single chunk, this makes one HTTP connection Raises
- NSConnector::Errors::EndChunking when there’s no more chunks Returns
-
Resource objects.
-
#normal_search_by_chunks(filters) ⇒ Object
Just keep grabbing incremental chunks till we’re told to stop.
-
#search_by_chunks(filters) ⇒ Object
Search by requesting chunks.
-
#threaded_search_by_chunks(filters) ⇒ Object
The basic logic here is, given four threads we have four workers, those workers keep eating chunks of data specified by the master.
Instance Method Details
#grab_chunk(filters, chunk) ⇒ Object
Retrieve a single chunk, this makes one HTTP connection
- Raises
-
NSConnector::Errors::EndChunking when there’s no more chunks
- Returns
-
Resource objects
6 7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/ns_connector/chunked_searching.rb', line 6 def grab_chunk(filters, chunk) NSConnector::Restlet.execute!( :action => 'search', :type_id => type_id, :fields => fields, :data => { :filters => filters, :chunk => chunk, } ).map do |upstream_store| self.new(upstream_store) end end |
#normal_search_by_chunks(filters) ⇒ Object
Just keep grabbing incremental chunks till we’re told to stop.
93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/ns_connector/chunked_searching.rb', line 93 def normal_search_by_chunks(filters) results = [] chunk = 0 while true begin results += grab_chunk(filters, chunk) chunk += 1 end end rescue NSConnector::Errors::EndChunking return results end |
#search_by_chunks(filters) ⇒ Object
Search by requesting chunks
107 108 109 110 111 112 113 |
# File 'lib/ns_connector/chunked_searching.rb', line 107 def search_by_chunks filters if NSConnector::Config[:use_threads] then return threaded_search_by_chunks(filters) else return normal_search_by_chunks(filters) end end |
#threaded_search_by_chunks(filters) ⇒ Object
The basic logic here is, given four threads we have four workers, those workers keep eating chunks of data specified by the master. When a worker recieves a EndChunking error, it flags done as true and everyone wraps up thier work. Pretty simple.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/ns_connector/chunked_searching.rb', line 24 def threaded_search_by_chunks(filters) require 'thread' threads = NSConnector::Config[:no_threads].to_i if threads < 1 then raise NSConnector::Config::ArgumentError, "Need more than #{threads} threads" end # We bother pre-populating the queue here because locking is # super expensive, on my build of ruby at least. queue = Queue.new (threads - 1).times do |i| queue << i end mutex = Mutex.new workers = [] results = [] current_chunk = threads - 1 done = false # Workers threads.times do workers << Thread.new do until done begin # Avoid a deadlock by popping # off -1 to exit chunk = queue.pop break if chunk == -1 result = grab_chunk( filters, chunk ) rescue NSConnector::Errors::EndChunking done = true break rescue Timeout::Error, Errno::ECONNRESET retry end mutex.synchronize do results += result end end end end # Master until done if queue.empty? then queue << current_chunk current_chunk += 1 end end threads.times do queue << -1 end workers.each do |worker| worker.join end return results end |