Class: Riak::Multiget
- Includes:
- Util::Translation
- Defined in:
- lib/riak/multiget.rb
Overview
Coordinates a parallel fetch operation for multiple values.
Instance Attribute Summary collapse
-
#client ⇒ Riak::Client
readonly
The associated client.
- #fetch_list ⇒ Array<Bucket, String> readonly
-
#finished ⇒ Boolean
readonly
Finished if the fetch operation has completed.
- #result_hash ⇒ Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
-
#thread_count ⇒ Integer
The number of threads to use.
Class Method Summary collapse
-
.get_all(client, fetch_list) ⇒ Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
Perform a Riak Multiget operation.
Instance Method Summary collapse
-
#fetch ⇒ Object
Starts the parallelized fetch operation.
- #finished? ⇒ Boolean
-
#initialize(client, fetch_list) ⇒ Multiget
constructor
Create a Riak Multiget operation.
- #results ⇒ Object
- #wait_for_finish ⇒ Object
Methods included from Util::Translation
Constructor Details
#initialize(client, fetch_list) ⇒ Multiget
Create a Riak Multiget operation.
37 38 39 40 41 42 43 44 45 46 |
# File 'lib/riak/multiget.rb', line 37 def initialize(client, fetch_list) raise ArgumentError, t('client_type', :client => client.inspect) unless client.is_a? Riak::Client raise ArgumentError, t('array_type', :array => fetch_list.inspect) unless fetch_list.is_a? Array validate_fetch_list fetch_list @client, @fetch_list = client, fetch_list.uniq self.result_hash = Hash.new @finished = false self.thread_count = client.multiget_threads end |
Instance Attribute Details
#client ⇒ Riak::Client (readonly)
Returns the associated client.
10 11 12 |
# File 'lib/riak/multiget.rb', line 10 def client @client end |
#fetch_list ⇒ Array<Bucket, String> (readonly)
13 14 15 |
# File 'lib/riak/multiget.rb', line 13 def fetch_list @fetch_list end |
#finished ⇒ Boolean (readonly)
Returns finished if the fetch operation has completed.
19 20 21 |
# File 'lib/riak/multiget.rb', line 19 def finished @finished end |
#result_hash ⇒ Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
16 17 18 |
# File 'lib/riak/multiget.rb', line 16 def result_hash @result_hash end |
#thread_count ⇒ Integer
Returns The number of threads to use.
22 23 24 |
# File 'lib/riak/multiget.rb', line 22 def thread_count @thread_count end |
Class Method Details
.get_all(client, fetch_list) ⇒ Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
Perform a Riak Multiget operation.
28 29 30 31 32 |
# File 'lib/riak/multiget.rb', line 28 def self.get_all(client, fetch_list) multi = new client, fetch_list multi.fetch multi.results end |
Instance Method Details
#fetch ⇒ Object
Starts the parallelized fetch operation
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/riak/multiget.rb', line 50 def fetch queue = fetch_list.dup queue_mutex = Mutex.new result_mutex = Mutex.new unless thread_count.is_a?(Integer) && thread_count > 0 raise ArgumentError, t("invalid_multiget_thread_count") end @threads = 1.upto(thread_count).map do |_node| Thread.new do loop do pair = queue_mutex.synchronize do queue.shift end break if pair.nil? found = attempt_fetch(*pair) result_mutex.synchronize do result_hash[pair] = found end end end end end |
#finished? ⇒ Boolean
82 83 84 85 |
# File 'lib/riak/multiget.rb', line 82 def finished? set_finished_for_thread_liveness finished end |
#results ⇒ Object
77 78 79 80 |
# File 'lib/riak/multiget.rb', line 77 def results wait_for_finish result_hash end |
#wait_for_finish ⇒ Object
87 88 89 90 91 |
# File 'lib/riak/multiget.rb', line 87 def wait_for_finish return if finished? @threads.each {|t| t.join } @finished = true end |