Class: Cql::Client::AsynchronousClient

Inherits:
Client
  • Object
show all
Defined in:
lib/cql/client/asynchronous_client.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ AsynchronousClient

Returns a new instance of AsynchronousClient.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/cql/client/asynchronous_client.rb', line 7

def initialize(options={})
  @logger = options[:logger] || NullLogger.new
  @io_reactor = options[:io_reactor] || Io::IoReactor.new(Protocol::CqlProtocolHandler)
  @hosts = extract_hosts(options)
  @initial_keyspace = options[:keyspace]
  @lock = Mutex.new
  @connected = false
  @connecting = false
  @closing = false
  @request_runner = RequestRunner.new
  @keyspace_changer = KeyspaceChanger.new
  @connection_manager = ConnectionManager.new
  port = options[:port] || DEFAULT_PORT
  credentials = options[:credentials]
  connections_per_node = options[:connections_per_node] || 1
  connection_timeout = options[:connection_timeout] || DEFAULT_CONNECTION_TIMEOUT
  default_consistency = options[:default_consistency] || DEFAULT_CONSISTENCY
  @execute_options_decoder = ExecuteOptionsDecoder.new(default_consistency)
  @connection_helper = ConnectionHelper.new(@io_reactor, port, credentials, connections_per_node, connection_timeout, @logger)
end

Instance Method Details

#closeObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/cql/client/asynchronous_client.rb', line 46

def close
  @lock.synchronize do
    return @closed_future if @closing
    @closing = true
    @closed_future = begin
      f = @io_reactor.stop
      if @connecting
        ff = @connected_future
        ff = f.flat_map { ff }
        ff = f.fallback { ff }
      else
        ff = f
      end
      ff.map { self }
    end
  end
  @closed_future.on_complete(&method(:closed))
  @closed_future
end

#connectObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/cql/client/asynchronous_client.rb', line 28

def connect
  @lock.synchronize do
    raise ClientError, 'Cannot connect a closed client' if @closing || @closed
    return @connected_future if can_execute?
    @connecting = true
    @connected_future = begin
      f = @connection_helper.connect(@hosts, @initial_keyspace)
      f.on_value do |connections|
        @connection_manager.add_connections(connections)
        register_event_listener(@connection_manager.random_connection)
      end
      f.map { self }
    end
  end
  @connected_future.on_complete(&method(:connected))
  @connected_future
end

#connected?Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/cql/client/asynchronous_client.rb', line 66

def connected?
  @connected
end

#execute(cql, options_or_consistency = nil) ⇒ Object



86
87
88
89
90
91
# File 'lib/cql/client/asynchronous_client.rb', line 86

def execute(cql, options_or_consistency=nil)
  with_failure_handler do
    consistency, timeout = @execute_options_decoder.decode_options(options_or_consistency)
    execute_request(Protocol::QueryRequest.new(cql, consistency), timeout)
  end
end

#keyspaceObject



70
71
72
# File 'lib/cql/client/asynchronous_client.rb', line 70

def keyspace
  @connection_manager.random_connection.keyspace
end

#prepare(cql) ⇒ Object



93
94
95
96
97
# File 'lib/cql/client/asynchronous_client.rb', line 93

def prepare(cql)
  with_failure_handler do
    AsynchronousPreparedStatement.prepare(cql, @execute_options_decoder, @connection_manager, @logger)
  end
end

#use(keyspace) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
# File 'lib/cql/client/asynchronous_client.rb', line 74

def use(keyspace)
  with_failure_handler do
    connections = @connection_manager.select { |c| c.keyspace != keyspace }
    if connections.any?
      futures = connections.map { |connection| @keyspace_changer.use_keyspace(keyspace, connection) }
      Future.all(*futures).map { nil }
    else
      Future.resolved
    end
  end
end