Class: Tarantool::EMDB
Direct Known Subclasses
Defined Under Namespace
Classes: Concatter, ConcatterReplace, Curry1, FeedResponse, OneReplica, OneShardRead, OneShardWrite
Constant Summary collapse
- IPROTO_CONNECTION_TYPE =
:em_callback
- INITIAL =
Object.new.freeze
Instance Attribute Summary
Attributes inherited from DB
#closed, #connections, #previous_shards_count
Instance Method Summary collapse
- #_send_request(shard_numbers, read_write, response) ⇒ Object
- #_send_to_one_shard(shard_number, read_write, response, feed) ⇒ Object
- #_send_to_several_shards(shard_numbers, read_write, response, feed) ⇒ Object
Methods inherited from DB
#_shard, #close, #close_connection, #initialize, #insert_with_shards_count, #method_missing, #primary_interface, #query, #shards_count, #space, #space_array, #space_hash
Constructor Details
This class inherits a constructor from Tarantool::DB
Dynamic Method Handling
This class handles dynamic methods through the method_missing method in the class Tarantool::DB
Instance Method Details
#_send_request(shard_numbers, read_write, response) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/tarantool/em_db.rb', line 23 def _send_request(shard_numbers, read_write, response) if @closed exc = ::IProto::Disconnected.new("Tarantool is closed") if EM.reactor_running? EM.next_tick Curry1.new(response.cb, exc) else response.cb.call exc end else feed = FeedResponse.new(response) shard_numbers = shard_numbers[0] if Array === shard_numbers && shard_numbers.size == 1 if Array === shard_numbers _send_to_several_shards(shard_numbers, read_write, response, feed) else _send_to_one_shard(shard_numbers, read_write, response, feed) end end end |
#_send_to_one_shard(shard_number, read_write, response, feed) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/tarantool/em_db.rb', line 42 def _send_to_one_shard(shard_number, read_write, response, feed) if (replicas = _shard(shard_number)).size == 1 replicas[0].send_request(response.request_type, response.body, OneReplica.new(response, feed)) elsif read_write == :read case @replica_strategy when :round_robin replicas = replicas.shuffle when :prefer_slave replicas = replicas[1..-1].shuffle << replicas[0] end EM.next_tick OneShardRead.new(replicas, response, feed) else EM.next_tick OneShardWrite.new(replicas, response, feed) end end |
#_send_to_several_shards(shard_numbers, read_write, response, feed) ⇒ Object
204 205 206 207 208 209 210 |
# File 'lib/tarantool/em_db.rb', line 204 def _send_to_several_shards(shard_numbers, read_write, response, feed) concat = read_write != :replace ? Concatter.new(shard_numbers.size, feed) : ConcatterReplace.new(shard_numbers.size, feed) for shard in shard_numbers _send_to_one_shard(shard, read_write, response, concat) end end |