Class: Tarantool::EMDB
- Inherits:
-
DB
- Object
- DB
- Tarantool::EMDB
show all
- Defined in:
- lib/tarantool/em_db.rb
Defined Under Namespace
Classes: Concatter, ConcatterReplace, Curry1, FeedResponse, OneReplica, OneShardRead, OneShardWrite
Constant Summary
collapse
- IPROTO_CONNECTION_TYPE =
:em_callback
- INITIAL =
Object.new.freeze
Instance Attribute Summary
Attributes inherited from DB
#closed, #connections, #previous_shards_count
Instance Method Summary
collapse
Methods inherited from DB
#_shard, #close, #close_connection, #each_connection, #initialize, #insert_with_shards_count, #method_missing, #primary_interface, #query, #shards_count, #space, #space_array, #space_hash
Constructor Details
This class inherits a constructor from Tarantool::DB
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
in the class Tarantool::DB
Instance Method Details
#_send_request(shard_numbers, read_write, response) ⇒ Object
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
# File 'lib/tarantool/em_db.rb', line 39
def _send_request(shard_numbers, read_write, response)
if @closed
exc = ::IProto::Disconnected.new("Tarantool is closed")
if EM.reactor_running?
EM.next_tick Curry1.new(response.cb, exc)
else
response.cb.call exc
end
else
feed = FeedResponse.new(response)
shard_numbers = shard_numbers[0] if Array === shard_numbers && shard_numbers.size == 1
if Array === shard_numbers
_send_to_several_shards(shard_numbers, read_write, response, feed)
else
_send_to_one_shard(shard_numbers, read_write, response, feed)
end
end
end
|
#_send_to_one_shard(shard_number, read_write, response, feed) ⇒ Object
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/tarantool/em_db.rb', line 58
def _send_to_one_shard(shard_number, read_write, response, feed)
if (replicas = _shard(shard_number)).size == 1
replicas[0].send_request(response.request_type, response.body, OneReplica.new(response, feed))
elsif read_write == :read
case @replica_strategy
when :round_robin
replicas = replicas.shuffle.sort_by!{|con| con.waiting_requests_size/5}
when :prefer_slave
master = replicas[0]
replicas = replicas[1..-1].shuffle.sort_by!{|con| con.waiting_requests_size/5}
replicas << master
end
EM.next_tick OneShardRead.new(replicas, response, feed)
else
EM.next_tick OneShardWrite.new(replicas, response, feed)
end
end
|
#_send_to_several_shards(shard_numbers, read_write, response, feed) ⇒ Object
222
223
224
225
226
227
228
|
# File 'lib/tarantool/em_db.rb', line 222
def _send_to_several_shards(shard_numbers, read_write, response, feed)
concat = read_write != :replace ? Concatter.new(shard_numbers.size, feed) :
ConcatterReplace.new(shard_numbers.size, feed)
for shard in shard_numbers
_send_to_one_shard(shard, read_write, response, concat)
end
end
|
#_tune_new_connection(con) ⇒ Object
18
19
20
21
|
# File 'lib/tarantool/em_db.rb', line 18
def _tune_new_connection(con)
super
con.comm_inactivity_timeout = inactivity_timeout
end
|
#inactivity_timeout ⇒ Object
7
8
9
|
# File 'lib/tarantool/em_db.rb', line 7
def inactivity_timeout
@inactivity_timeout ||= 0
end
|
#inactivity_timeout=(v) ⇒ Object
11
12
13
14
15
16
|
# File 'lib/tarantool/em_db.rb', line 11
def inactivity_timeout=(v)
@inactivity_timeout = v || 0
each_connection do |c|
c.comm_inactivity_timeout = @inactivity_timeout
end
end
|