Class: Tarantool::EMDB

Inherits:
DB
  • Object
show all
Defined in:
lib/tarantool/em_db.rb

Direct Known Subclasses

CallbackDB, FiberDB

Defined Under Namespace

Classes: Concatter, ConcatterReplace, Curry1, FeedResponse, OneReplica, OneShardRead, OneShardWrite

Constant Summary collapse

IPROTO_CONNECTION_TYPE =
:em_callback
INITIAL =
Object.new.freeze

Instance Attribute Summary

Attributes inherited from DB

#closed, #connections, #previous_shards_count

Instance Method Summary collapse

Methods inherited from DB

#_shard, #close, #close_connection, #each_connection, #initialize, #insert_with_shards_count, #method_missing, #primary_interface, #query, #shards_count, #space, #space_array, #space_hash

Constructor Details

This class inherits a constructor from Tarantool::DB

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class Tarantool::DB

Instance Method Details

#_send_request(shard_numbers, read_write, response) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/tarantool/em_db.rb', line 39

def _send_request(shard_numbers, read_write, response)
  if @closed
    exc =  ::IProto::Disconnected.new("Tarantool is closed")
    if EM.reactor_running?
      EM.next_tick Curry1.new(response.cb, exc)
    else
      response.cb.call exc
    end
  else
    feed = FeedResponse.new(response)
    shard_numbers = shard_numbers[0]  if Array === shard_numbers && shard_numbers.size == 1
    if Array === shard_numbers
      _send_to_several_shards(shard_numbers, read_write, response, feed)
    else
      _send_to_one_shard(shard_numbers, read_write, response, feed)
    end
  end
end

#_send_to_one_shard(shard_number, read_write, response, feed) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/tarantool/em_db.rb', line 58

def _send_to_one_shard(shard_number, read_write, response, feed)
  if (replicas = _shard(shard_number)).size == 1
    replicas[0].send_request(response.request_type, response.body, OneReplica.new(response, feed))
  elsif read_write == :read
    case @replica_strategy
    when :round_robin
      replicas = replicas.shuffle
    when :prefer_slave
      replicas = replicas[1..-1].shuffle << replicas[0]
    end
    EM.next_tick OneShardRead.new(replicas, response, feed)
  else
    EM.next_tick OneShardWrite.new(replicas, response, feed)
  end
end

#_send_to_several_shards(shard_numbers, read_write, response, feed) ⇒ Object



220
221
222
223
224
225
226
# File 'lib/tarantool/em_db.rb', line 220

def _send_to_several_shards(shard_numbers, read_write, response, feed)
  concat = read_write != :replace ? Concatter.new(shard_numbers.size, feed) :
                                    ConcatterReplace.new(shard_numbers.size, feed)
  for shard in shard_numbers
    _send_to_one_shard(shard, read_write, response, concat)
  end
end

#_tune_new_connection(con) ⇒ Object



18
19
20
21
# File 'lib/tarantool/em_db.rb', line 18

def _tune_new_connection(con)
  super
  con.comm_inactivity_timeout = inactivity_timeout
end

#inactivity_timeoutObject



7
8
9
# File 'lib/tarantool/em_db.rb', line 7

def inactivity_timeout
  @inactivity_timeout ||= 0
end

#inactivity_timeout=(v) ⇒ Object



11
12
13
14
15
16
# File 'lib/tarantool/em_db.rb', line 11

def inactivity_timeout=(v)
  @inactivity_timeout = v || 0
  each_connection do |c|
    c.comm_inactivity_timeout = @inactivity_timeout
  end
end