Class: Riak::Multi
- Includes:
- Util::Translation
- Defined in:
- lib/riak/multi.rb
Overview
Coordinates a parallel operation for multiple keys.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#client ⇒ Riak::Client
readonly
The associated client.
- #keys ⇒ Array<Bucket, String> readonly
- #result_hash ⇒ Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
-
#thread_count ⇒ Integer
The number of threads to use.
Class Method Summary collapse
-
.perform(client, keys) ⇒ Hash<key, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
Perform a Riak Multi operation.
Instance Method Summary collapse
- #finished? ⇒ Boolean (also: #finished)
-
#initialize(client, keys) ⇒ Multi
constructor
Create a Riak Multi operation.
-
#perform ⇒ Object
Starts the parallelized operation.
- #results ⇒ Object
- #wait_for_finish ⇒ Object
Methods included from Util::Translation
Constructor Details
#initialize(client, keys) ⇒ Multi
Create a Riak Multi operation.
49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/riak/multi.rb', line 49 def initialize(client, keys) raise ArgumentError, t('client_type', :client => client.inspect) unless client.is_a? Riak::Client raise ArgumentError, t('array_type', :array => keys.inspect) unless keys.is_a? Array self.thread_count = client.multi_threads validate_keys keys @client = client @keys = keys.uniq self.result_hash = {} @finished = false end |
Instance Attribute Details
#client ⇒ Riak::Client (readonly)
Returns the associated client.
24 25 26 |
# File 'lib/riak/multi.rb', line 24 def client @client end |
#keys ⇒ Array<Bucket, String> (readonly)
27 28 29 |
# File 'lib/riak/multi.rb', line 27 def keys @keys end |
#result_hash ⇒ Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
30 31 32 |
# File 'lib/riak/multi.rb', line 30 def result_hash @result_hash end |
#thread_count ⇒ Integer
Returns The number of threads to use.
33 34 35 |
# File 'lib/riak/multi.rb', line 33 def thread_count @thread_count end |
Class Method Details
Instance Method Details
#finished? ⇒ Boolean Also known as: finished
90 91 92 |
# File 'lib/riak/multi.rb', line 90 def finished? @finished ||= @threads && @threads.none?(&:alive?) end |
#perform ⇒ Object
Starts the parallelized operation
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/riak/multi.rb', line 62 def perform queue = keys.dup queue_mutex = Mutex.new result_mutex = Mutex.new @threads = 1.upto(thread_count).map do |_node| Thread.new do loop do pair = queue_mutex.synchronize do queue.shift end break if pair.nil? found = work(*pair) result_mutex.synchronize do result_hash[pair] = found end end end end end |
#results ⇒ Object
85 86 87 88 |
# File 'lib/riak/multi.rb', line 85 def results wait_for_finish result_hash end |
#wait_for_finish ⇒ Object
95 96 97 98 99 |
# File 'lib/riak/multi.rb', line 95 def wait_for_finish return if finished? @threads.each(&:join) @finished = true end |