Class: Riak::Multi
- Includes:
- Util::Translation
- Defined in:
- lib/riak/multi.rb
Overview
Coordinates a parallel operation for multiple keys.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#client ⇒ Riak::Client
readonly
The associated client.
- #keys ⇒ Array<Bucket, String> readonly
- #result_hash ⇒ Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
-
#thread_count ⇒ Integer
The number of threads to use.
Class Method Summary collapse
-
.perform(client, keys) ⇒ Hash<key, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
Perform a Riak Multi operation.
Instance Method Summary collapse
- #finished? ⇒ Boolean (also: #finished)
-
#initialize(client, keys) ⇒ Multi
constructor
Create a Riak Multi operation.
-
#perform ⇒ Object
Starts the parallelized operation.
- #results ⇒ Object
- #wait_for_finish ⇒ Object
Methods included from Util::Translation
Constructor Details
#initialize(client, keys) ⇒ Multi
Create a Riak Multi operation.
35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/riak/multi.rb', line 35 def initialize(client, keys) raise ArgumentError, t('client_type', :client => client.inspect) unless client.is_a? Riak::Client raise ArgumentError, t('array_type', :array => keys.inspect) unless keys.is_a? Array self.thread_count = client.multi_threads validate_keys keys @client = client @keys = keys.uniq self.result_hash = {} @finished = false end |
Instance Attribute Details
#client ⇒ Riak::Client (readonly)
Returns the associated client.
10 11 12 |
# File 'lib/riak/multi.rb', line 10 def client @client end |
#keys ⇒ Array<Bucket, String> (readonly)
13 14 15 |
# File 'lib/riak/multi.rb', line 13 def keys @keys end |
#result_hash ⇒ Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances
16 17 18 |
# File 'lib/riak/multi.rb', line 16 def result_hash @result_hash end |
#thread_count ⇒ Integer
Returns The number of threads to use.
19 20 21 |
# File 'lib/riak/multi.rb', line 19 def thread_count @thread_count end |
Class Method Details
Instance Method Details
#finished? ⇒ Boolean Also known as: finished
76 77 78 |
# File 'lib/riak/multi.rb', line 76 def finished? @finished ||= @threads && @threads.none?(&:alive?) end |
#perform ⇒ Object
Starts the parallelized operation
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/riak/multi.rb', line 48 def perform queue = keys.dup queue_mutex = Mutex.new result_mutex = Mutex.new @threads = 1.upto(thread_count).map do |_node| Thread.new do loop do pair = queue_mutex.synchronize do queue.shift end break if pair.nil? found = work(*pair) result_mutex.synchronize do result_hash[pair] = found end end end end end |
#results ⇒ Object
71 72 73 74 |
# File 'lib/riak/multi.rb', line 71 def results wait_for_finish result_hash end |
#wait_for_finish ⇒ Object
81 82 83 84 85 |
# File 'lib/riak/multi.rb', line 81 def wait_for_finish return if finished? @threads.each(&:join) @finished = true end |