Class: Cql::Client::ConnectionHelper
- Inherits:
-
Object
- Object
- Cql::Client::ConnectionHelper
- Defined in:
- lib/cql/client/connection_helper.rb
Defined Under Namespace
Classes: FailedConnection
Instance Method Summary collapse
- #connect(hosts, initial_keyspace) ⇒ Object
- #discover_peers(seed_connections, initial_keyspace) ⇒ Object
-
#initialize(io_reactor, port, credentials, connections_per_node, connection_timeout, logger) ⇒ ConnectionHelper
constructor
A new instance of ConnectionHelper.
Constructor Details
#initialize(io_reactor, port, credentials, connections_per_node, connection_timeout, logger) ⇒ ConnectionHelper
Returns a new instance of ConnectionHelper.
7 8 9 10 11 12 13 14 15 16 |
# File 'lib/cql/client/connection_helper.rb', line 7 def initialize(io_reactor, port, credentials, connections_per_node, connection_timeout, logger) @io_reactor = io_reactor @port = port @credentials = credentials @connections_per_node = connections_per_node @connection_timeout = connection_timeout @logger = logger @request_runner = RequestRunner.new @keyspace_changer = KeyspaceChanger.new end |
Instance Method Details
#connect(hosts, initial_keyspace) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/cql/client/connection_helper.rb', line 18 def connect(hosts, initial_keyspace) f = @io_reactor.start.flat_map do connect_to_hosts(hosts, initial_keyspace, true) end f = f.map do |connections| connected_connections = connections.select(&:connected?) if connected_connections.empty? e = connections.first.error if e.is_a?(Cql::QueryError) && e.code == 0x100 e = AuthenticationError.new(e.) end raise e end connected_connections end f end |
#discover_peers(seed_connections, initial_keyspace) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/cql/client/connection_helper.rb', line 36 def discover_peers(seed_connections, initial_keyspace) @logger.debug('Looking for additional nodes') connection = seed_connections.sample return Future.resolved([]) unless connection request = Protocol::QueryRequest.new('SELECT peer, data_center, host_id, rpc_address FROM system.peers', :one) peer_info = @request_runner.execute(connection, request) peer_info.flat_map do |result| seed_dcs = seed_connections.map { |c| c[:data_center] }.uniq unconnected_peers = result.select do |row| seed_dcs.include?(row['data_center']) && seed_connections.none? { |c| c[:host_id] == row['host_id'] } end if unconnected_peers.empty? @logger.debug('No additional nodes found') else @logger.debug('%d additional nodes found' % unconnected_peers.size) end node_addresses = unconnected_peers.map do |row| rpc_address = row['rpc_address'].to_s if rpc_address == '0.0.0.0' row['peer'].to_s else rpc_address end end if node_addresses.any? connect_to_hosts(node_addresses, initial_keyspace, false) else Future.resolved([]) end end end |