Module: NSConnector::ChunkedSearching

Included in:
Resource
Defined in:
lib/ns_connector/chunked_searching.rb

Overview

Provide threaded and non-threaded chunked searching

Instance Method Summary collapse

Instance Method Details

#grab_chunk(filters, chunk) ⇒ Object

Retrieve a single chunk, this makes one HTTP connection

Raises

NSConnector::Errors::EndChunking when there’s no more chunks

Returns

Resource objects



6
7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/ns_connector/chunked_searching.rb', line 6

def grab_chunk(filters, chunk)
	NSConnector::Restlet.execute!(
		:action => 'search',
		:type_id => type_id,
		:fields => fields,
		:data => {
			:filters => filters,
			:chunk => chunk,
		}
	).map do |upstream_store|
		self.new(upstream_store)
	end
end

#normal_search_by_chunks(filters) ⇒ Object

Just keep grabbing incremental chunks till we’re told to stop.



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/ns_connector/chunked_searching.rb', line 93

def normal_search_by_chunks(filters)
	results = []
	chunk = 0
	while true
		begin
			results += grab_chunk(filters, chunk)
			chunk += 1
		end
	end
rescue NSConnector::Errors::EndChunking
	return results
end

#search_by_chunks(filters) ⇒ Object

Search by requesting chunks



107
108
109
110
111
112
113
# File 'lib/ns_connector/chunked_searching.rb', line 107

def search_by_chunks filters
	if NSConnector::Config[:use_threads] then
		return threaded_search_by_chunks(filters)
	else
		return normal_search_by_chunks(filters)
	end
end

#threaded_search_by_chunks(filters) ⇒ Object

The basic logic here is, given four threads we have four workers, those workers keep eating chunks of data specified by the master. When a worker recieves a EndChunking error, it flags done as true and everyone wraps up thier work. Pretty simple.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/ns_connector/chunked_searching.rb', line 24

def threaded_search_by_chunks(filters)
	require 'thread'
	threads = NSConnector::Config[:no_threads].to_i
	if threads < 1 then
		raise NSConnector::Config::ArgumentError, 
			"Need more than #{threads} threads"
	end

	# We bother pre-populating the queue here because locking is
	# super expensive, on my build of ruby at least.
	queue = Queue.new
	(threads - 1).times do |i|
		queue << i
	end

	mutex = Mutex.new

	workers = []
	results = []
	current_chunk = threads - 1
	done = false

	# Workers
	threads.times do
		workers << Thread.new do
			until done
				begin
					# Avoid a deadlock by popping
					# off -1 to exit
					chunk = queue.pop 
					break if chunk == -1

					result = grab_chunk(
						filters, chunk
					)
				rescue NSConnector::Errors::EndChunking
					done = true
					break
				rescue Timeout::Error, Errno::ECONNRESET
					retry
				end

				mutex.synchronize do
					results += result
				end
			end
		end
	end

	# Master
	until done
		if queue.empty? then
			queue << current_chunk
			current_chunk += 1
		end
	end

	threads.times do 
		queue << -1
	end

	workers.each do |worker|
		worker.join
	end

	return results
end