Module: NSConnector::ChunkedSearching
- Included in:
- Resource
- Defined in:
- lib/ns_connector/chunked_searching.rb
Overview
Provide threaded and non-threaded chunked searching
Instance Method Summary collapse
-
#grab_chunk(filters, chunk) ⇒ Object
- Retrieve a single chunk, this makes one HTTP connection Raises
- NSConnector::Errors::EndChunking when there’s no more chunks Returns
-
Resource objects.
-
#normal_search_by_chunks(filters) ⇒ Object
Just keep grabbing incremental chunks till we’re told to stop.
-
#search_by_chunks(filters) ⇒ Object
Search by requesting chunks.
-
#threaded_search_by_chunks(filters) ⇒ Object
The basic logic here is, given four threads we have four workers, those workers keep eating chunks of data specified by the master.
Instance Method Details
#grab_chunk(filters, chunk) ⇒ Object
Retrieve a single chunk, this makes one HTTP connection
- Raises
-
NSConnector::Errors::EndChunking when there’s no more chunks
- Returns
-
Resource objects
6 7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/ns_connector/chunked_searching.rb', line 6 def grab_chunk(filters, chunk) NSConnector::Restlet.execute!( :action => 'search', :type_id => type_id, :fields => fields, :data => { :filters => filters, :chunk => chunk, } ).map do |upstream_store| self.new(upstream_store) end end |
#normal_search_by_chunks(filters) ⇒ Object
Just keep grabbing incremental chunks till we’re told to stop.
91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/ns_connector/chunked_searching.rb', line 91 def normal_search_by_chunks(filters) results = [] chunk = 0 while true begin results += grab_chunk(filters, chunk) chunk += 1 end end rescue NSConnector::Errors::EndChunking return results end |
#search_by_chunks(filters) ⇒ Object
Search by requesting chunks
105 106 107 108 109 110 111 |
# File 'lib/ns_connector/chunked_searching.rb', line 105 def search_by_chunks filters if NSConnector::Config[:use_threads] then return threaded_search_by_chunks(filters) else return normal_search_by_chunks(filters) end end |
#threaded_search_by_chunks(filters) ⇒ Object
The basic logic here is, given four threads we have four workers, those workers keep eating chunks of data specified by the master. When a worker recieves a EndChunking error, it flags done as true and everyone wraps up thier work. Pretty simple.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/ns_connector/chunked_searching.rb', line 24 def threaded_search_by_chunks(filters) require 'thread' threads = NSConnector::Config[:no_threads].to_i if threads < 1 then raise NSConnector::Config::ArgumentError, "Need more than #{threads} threads" end # We bother pre-populating the queue here because locking is # super expensive, on my build of ruby at least. queue = Queue.new (threads - 1).times do |i| queue << i end mutex = Mutex.new workers = [] results = [] current_chunk = threads - 1 done = false # Workers threads.times do workers << Thread.new do until done begin # Avoid a deadlock by popping # off -1 to exit chunk = queue.pop break if chunk == -1 result = grab_chunk( filters, chunk ) rescue NSConnector::Errors::EndChunking done = true break end mutex.synchronize do results += result end end end end # Master until done if queue.empty? then queue << current_chunk current_chunk += 1 end end threads.times do queue << -1 end workers.each do |worker| worker.join end return results end |