Class: Cql::Client::AsynchronousClient
- Defined in:
- lib/cql/client/asynchronous_client.rb
Instance Method Summary collapse
- #close ⇒ Object
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #execute(cql, options_or_consistency = nil) ⇒ Object
-
#initialize(options = {}) ⇒ AsynchronousClient
constructor
A new instance of AsynchronousClient.
- #keyspace ⇒ Object
- #prepare(cql) ⇒ Object
- #use(keyspace) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ AsynchronousClient
Returns a new instance of AsynchronousClient.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/cql/client/asynchronous_client.rb', line 7 def initialize(={}) @logger = [:logger] || NullLogger.new @io_reactor = [:io_reactor] || Io::IoReactor.new(Protocol::CqlProtocolHandler) @hosts = extract_hosts() @initial_keyspace = [:keyspace] @lock = Mutex.new @connected = false @connecting = false @closing = false @request_runner = RequestRunner.new @keyspace_changer = KeyspaceChanger.new @connection_manager = ConnectionManager.new port = [:port] || DEFAULT_PORT credentials = [:credentials] connections_per_node = [:connections_per_node] || 1 connection_timeout = [:connection_timeout] || DEFAULT_CONNECTION_TIMEOUT default_consistency = [:default_consistency] || DEFAULT_CONSISTENCY @execute_options_decoder = ExecuteOptionsDecoder.new(default_consistency) @connection_helper = ConnectionHelper.new(@io_reactor, port, credentials, connections_per_node, connection_timeout, @logger) end |
Instance Method Details
#close ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/cql/client/asynchronous_client.rb', line 46 def close @lock.synchronize do return @closed_future if @closing @closing = true @closed_future = begin f = @io_reactor.stop if @connecting ff = @connected_future ff = f.flat_map { ff } ff = f.fallback { ff } else ff = f end ff.map { self } end end @closed_future.on_complete(&method(:closed)) @closed_future end |
#connect ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/cql/client/asynchronous_client.rb', line 28 def connect @lock.synchronize do raise ClientError, 'Cannot connect a closed client' if @closing || @closed return @connected_future if can_execute? @connecting = true @connected_future = begin f = @connection_helper.connect(@hosts, @initial_keyspace) f.on_value do |connections| @connection_manager.add_connections(connections) register_event_listener(@connection_manager.random_connection) end f.map { self } end end @connected_future.on_complete(&method(:connected)) @connected_future end |
#connected? ⇒ Boolean
66 67 68 |
# File 'lib/cql/client/asynchronous_client.rb', line 66 def connected? @connected end |
#execute(cql, options_or_consistency = nil) ⇒ Object
86 87 88 89 90 91 |
# File 'lib/cql/client/asynchronous_client.rb', line 86 def execute(cql, =nil) with_failure_handler do consistency, timeout = @execute_options_decoder.() execute_request(Protocol::QueryRequest.new(cql, consistency), timeout) end end |
#keyspace ⇒ Object
70 71 72 |
# File 'lib/cql/client/asynchronous_client.rb', line 70 def keyspace @connection_manager.random_connection.keyspace end |
#prepare(cql) ⇒ Object
93 94 95 96 97 |
# File 'lib/cql/client/asynchronous_client.rb', line 93 def prepare(cql) with_failure_handler do AsynchronousPreparedStatement.prepare(cql, @execute_options_decoder, @connection_manager, @logger) end end |
#use(keyspace) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/cql/client/asynchronous_client.rb', line 74 def use(keyspace) with_failure_handler do connections = @connection_manager.select { |c| c.keyspace != keyspace } if connections.any? futures = connections.map { |connection| @keyspace_changer.use_keyspace(keyspace, connection) } Future.all(*futures).map { nil } else Future.resolved end end end |