Module: Cassandra

Defined in:
lib/cassandra/udt.rb,
lib/cassandra.rb,
lib/cassandra.rb,
lib/cassandra/auth.rb,
lib/cassandra/host.rb,
lib/cassandra/util.rb,
lib/cassandra/uuid.rb,
lib/cassandra/retry.rb,
lib/cassandra/table.rb,
lib/cassandra/tuple.rb,
lib/cassandra/types.rb,
lib/cassandra/column.rb,
lib/cassandra/driver.rb,
lib/cassandra/errors.rb,
lib/cassandra/future.rb,
lib/cassandra/result.rb,
lib/cassandra/cluster.rb,
lib/cassandra/session.rb,
lib/cassandra/version.rb,
lib/cassandra/keyspace.rb,
lib/cassandra/listener.rb,
lib/cassandra/protocol.rb,
lib/cassandra/executors.rb,
lib/cassandra/statement.rb,
lib/cassandra/time_uuid.rb,
lib/cassandra/compression.rb,
lib/cassandra/null_logger.rb,
lib/cassandra/protocol/v1.rb,
lib/cassandra/protocol/v3.rb,
lib/cassandra/reconnection.rb,
lib/cassandra/cluster/client.rb,
lib/cassandra/cluster/schema.rb,
lib/cassandra/execution/info.rb,
lib/cassandra/load_balancing.rb,
lib/cassandra/protocol/coder.rb,
lib/cassandra/uuid/generator.rb,
lib/cassandra/cluster/options.rb,
lib/cassandra/execution/trace.rb,
lib/cassandra/statements/void.rb,
lib/cassandra/cluster/metadata.rb,
lib/cassandra/cluster/registry.rb,
lib/cassandra/protocol/request.rb,
lib/cassandra/statements/batch.rb,
lib/cassandra/statements/bound.rb,
lib/cassandra/cluster/connector.rb,
lib/cassandra/execution/options.rb,
lib/cassandra/protocol/response.rb,
lib/cassandra/statements/simple.rb,
lib/cassandra/address_resolution.rb,
lib/cassandra/statements/prepared.rb,
lib/cassandra/retry/policies/default.rb,
lib/cassandra/auth/providers/password.rb,
lib/cassandra/cluster/connection_pool.rb,
lib/cassandra/protocol/cql_byte_buffer.rb,
lib/cassandra/cluster/failed_connection.rb,
lib/cassandra/cluster/control_connection.rb,
lib/cassandra/cluster/schema/type_parser.rb,
lib/cassandra/retry/policies/fallthrough.rb,
lib/cassandra/compression/compressors/lz4.rb,
lib/cassandra/protocol/cql_protocol_handler.rb,
lib/cassandra/compression/compressors/snappy.rb,
lib/cassandra/reconnection/policies/constant.rb,
lib/cassandra/protocol/requests/batch_request.rb,
lib/cassandra/protocol/requests/query_request.rb,
lib/cassandra/address_resolution/policies/none.rb,
lib/cassandra/protocol/requests/execute_request.rb,
lib/cassandra/protocol/requests/options_request.rb,
lib/cassandra/protocol/requests/prepare_request.rb,
lib/cassandra/protocol/requests/startup_request.rb,
lib/cassandra/protocol/responses/error_response.rb,
lib/cassandra/protocol/responses/event_response.rb,
lib/cassandra/protocol/responses/ready_response.rb,
lib/cassandra/reconnection/policies/exponential.rb,
lib/cassandra/cluster/schema/partitioners/random.rb,
lib/cassandra/load_balancing/policies/white_list.rb,
lib/cassandra/protocol/requests/register_request.rb,
lib/cassandra/protocol/responses/result_response.rb,
lib/cassandra/cluster/schema/partitioners/murmur3.rb,
lib/cassandra/cluster/schema/partitioners/ordered.rb,
lib/cassandra/load_balancing/policies/round_robin.rb,
lib/cassandra/load_balancing/policies/token_aware.rb,
lib/cassandra/protocol/requests/void_query_request.rb,
lib/cassandra/protocol/requests/credentials_request.rb,
lib/cassandra/protocol/responses/supported_response.rb,
lib/cassandra/retry/policies/downgrading_consistency.rb,
lib/cassandra/protocol/requests/auth_response_request.rb,
lib/cassandra/protocol/responses/rows_result_response.rb,
lib/cassandra/protocol/responses/void_result_response.rb,
lib/cassandra/protocol/responses/auth_success_response.rb,
lib/cassandra/protocol/responses/authenticate_response.rb,
lib/cassandra/cluster/schema/replication_strategies/none.rb,
lib/cassandra/protocol/responses/auth_challenge_response.rb,
lib/cassandra/protocol/responses/prepared_result_response.rb,
lib/cassandra/protocol/responses/raw_rows_result_response.rb,
lib/cassandra/address_resolution/policies/ec2_multi_region.rb,
lib/cassandra/cluster/schema/replication_strategies/simple.rb,
lib/cassandra/load_balancing/policies/dc_aware_round_robin.rb,
lib/cassandra/protocol/responses/unprepared_error_response.rb,
lib/cassandra/protocol/responses/unavailable_error_response.rb,
lib/cassandra/protocol/responses/read_timeout_error_response.rb,
lib/cassandra/protocol/responses/schema_change_event_response.rb,
lib/cassandra/protocol/responses/set_keyspace_result_response.rb,
lib/cassandra/protocol/responses/status_change_event_response.rb,
lib/cassandra/protocol/responses/write_timeout_error_response.rb,
lib/cassandra/protocol/responses/already_exists_error_response.rb,
lib/cassandra/protocol/responses/schema_change_result_response.rb,
lib/cassandra/protocol/responses/topology_change_event_response.rb,
lib/cassandra/cluster/schema/replication_strategies/network_topology.rb

Overview

-- Copyright 2013-2015 DataStax, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ++

Defined Under Namespace

Modules: AddressResolution, Auth, Compression, Error, Errors, Execution, LoadBalancing, Reconnection, Retry, Statement, Statements, Types Classes: Cluster, Column, Future, Host, Keyspace, Listener, Result, Session, Table, TimeUuid, Tuple, Type, UDT, Uuid

Constant Summary collapse

CONSISTENCIES =

A list of all supported request consistencies

[ :any, :one, :two, :three, :quorum, :all, :local_quorum,
:each_quorum, :serial, :local_serial, :local_one ].freeze
SERIAL_CONSISTENCIES =

A list of all supported serial consistencies

[:serial, :local_serial].freeze
WRITE_TYPES =

A list of all possible write types that a Errors::WriteTimeoutError can have.

[:simple, :batch, :unlogged_batch, :counter, :batch_log].freeze
VERSION =
'2.1.3'.freeze

Class Method Summary collapse

Class Method Details

.cluster(options = {}) ⇒ Cassandra::Cluster

Creates a Cluster instance.

Examples:

Connecting to localhost

cluster = Cassandra.cluster

Configuring Cluster

cluster = Cassandra.cluster(
            username: username,
            password: password,
            hosts: ['10.0.1.1', '10.0.1.2', '10.0.1.3']
          )

Parameters:

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :hosts (Array<String, IPAddr>) — default: ['127.0.0.1']

    a list of initial addresses. Note that the entire list of cluster members will be discovered automatically once a connection to any hosts from the original list is successful.

  • :port (Integer) — default: 9042

    cassandra native protocol port.

  • :datacenter (String) — default: nil

    name of current datacenter. First datacenter found will be assumed current by default. Note that you can skip this option if you specify only hosts from the local datacenter in :hosts option.

  • :shuffle_replicas (Boolean) — default: true

    whether replicas list found by the default Token-Aware Load Balancing Policy should be shuffled. See Token-Aware Load Balancing Policy.

  • :connect_timeout (Numeric) — default: 10

    connection timeout in seconds. Setting value to nil will reset it to 5 seconds.

  • :timeout (Numeric) — default: 10

    request execution timeout in seconds. Setting value to nil will remove request timeout.

  • :heartbeat_interval (Numeric) — default: 30

    how often should a heartbeat be sent to determine if a connection is alive. Several things to note about this option. Only one heartbeat request will ever be outstanding on a given connection. Each heatbeat will be sent in at least :heartbeat_interval seconds after the last request has been sent on a given connection. Setting value to nil will remove connection timeout.

  • :idle_timeout (Numeric) — default: 60

    period of inactivity after which a connection is considered dead. Note that this value should be at least a few times larger than :heartbeat_interval. Setting value to nil will remove automatic connection termination.

  • :username (String) — default: none

    username to use for authentication to cassandra. Note that you must also specify :password.

  • :password (String) — default: none

    password to use for authentication to cassandra. Note that you must also specify :username.

  • :ssl (Boolean, OpenSSL::SSL::SSLContext) — default: false

    enable default ssl authentication if true (not recommended). Also accepts an initialized OpenSSL::SSL::SSLContext. Note that this option should be ignored if :server_cert, :client_cert, :private_key or :passphrase are given.

  • :server_cert (String) — default: none

    path to server certificate or certificate authority file.

  • :client_cert (String) — default: none

    path to client certificate file. Note that this option is only required when encryption is configured to require client authentication.

  • :private_key (String) — default: none

    path to client private key. Note that this option is only required when encryption is configured to require client authentication.

  • :passphrase (String) — default: none

    passphrase for private key.

  • :compression (Symbol) — default: none

    compression to use. Must be either :snappy or :lz4. Also note, that in order for compression to work, you must install 'snappy' or 'lz4-ruby' gems.

  • :load_balancing_policy (Cassandra::LoadBalancing::Policy)

    default: token aware data center aware round robin.

  • :address_resolution (Symbol) — default: :none

    a pre-configured address resolver to use. Must be one of :none or :ec2_multi_region.

  • :synchronize_schema (Boolean) — default: true

    whether the driver should automatically keep schema metadata synchronized. When enabled, the driver updates schema metadata after receiving schema change notifications from Cassandra. Setting this setting to false disables automatic schema updates. Schema metadata is used by the driver to determine cluster partitioners as well as to find partition keys and replicas of prepared statements, this information makes token aware load balancing possible. One can still refresh schema manually.

  • :schema_refresh_delay (Numeric) — default: 1

    the driver will wait for :schema_refresh_delay before fetching metadata after receiving a schema change event. This timer is restarted every time a new schema change event is received. Finally, when the timer expires or a maximum wait time of :schema_refresh_timeout has been reached, a schema refresh attempt will be made and the timeout is reset.

  • :schema_refresh_timeout (Numeric) — default: 10

    the maximum delay before automatically refreshing schema. Such delay can occur whenever multiple schema change events are continuously arriving within :schema_refresh_delay interval.

  • :reconnection_policy (Cassandra::Reconnection::Policy)

    default: Exponential. Note that the default policy is configured with (0.5, 30, 2).

  • :retry_policy (Cassandra::Retry::Policy)
  • :logger (Logger) — default: none

    logger. a Logger instance from the standard library or any object responding to standard log methods (#debug, #info, #warn, #error and #fatal).

  • :listeners (Enumerable<Cassandra::Listener>) — default: none

    initial listeners. A list of initial cluster state listeners. Note that a :load_balancing policy is automatically registered with the cluster.

  • :consistency (Symbol) — default: :one

    default consistency to use for all requests. Must be one of CONSISTENCIES.

  • :trace (Boolean) — default: false

    whether or not to trace all requests by default.

  • :page_size (Integer) — default: 10000

    default page size for all select queries. Set this value to nil to disable paging.

  • :credentials (Hash{String => String}) — default: none

    a hash of credentials - to be used with credentials authentication in cassandra 1.2. Note that if you specified :username and :password options, those credentials are configured automatically.

  • :auth_provider (Cassandra::Auth::Provider) — default: none

    a custom auth provider to be used with SASL authentication in cassandra 2.0. Note that if you have specified :username and :password, then a Password Provider will be used automatically.

  • :compressor (Cassandra::Compression::Compressor) — default: none

    a custom compressor. Note that if you have specified :compression, an appropriate compressor will be provided automatically.

  • :address_resolution_policy (Cassandra::AddressResolution::Policy)

    default: No Resolution Policy a custom address resolution policy. Note that if you have specified :address_resolution, an appropriate address resolution policy will be provided automatically.

  • :futures_factory (Object<#all, #error, #value, #promise>)

    default: Future a futures factory to assist with integration into existing futures library. Note that promises returned by this object must conform to Promise api, which is not yet public. Things may change, use at your own risk.

Returns:



201
202
203
# File 'lib/cassandra.rb', line 201

def self.cluster(options = {})
  cluster_async(options).get
end

.cluster_async(options = {}) ⇒ Cassandra::Future<Cassandra::Cluster>

Creates a Cluster instance.

Returns:

See Also:



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
# File 'lib/cassandra.rb', line 211

def self.cluster_async(options = {})
  options = options.select do |key, value|
    [ :credentials, :auth_provider, :compression, :hosts, :logger, :port,
      :load_balancing_policy, :reconnection_policy, :retry_policy, :listeners,
      :consistency, :trace, :page_size, :compressor, :username, :password,
      :ssl, :server_cert, :client_cert, :private_key, :passphrase,
      :connect_timeout, :futures_factory, :datacenter, :address_resolution,
      :address_resolution_policy, :idle_timeout, :heartbeat_interval, :timeout,
      :synchronize_schema, :schema_refresh_delay, :schema_refresh_timeout,
      :shuffle_replicas
    ].include?(key)
  end

  has_username = options.has_key?(:username)
  has_password = options.has_key?(:password)
  if has_username || has_password
    if has_username && !has_password
      raise ::ArgumentError, "both :username and :password options must be specified, but only :username given"
    end

    if !has_username && has_password
      raise ::ArgumentError, "both :username and :password options must be specified, but only :password given"
    end

    username = options.delete(:username)
    password = options.delete(:password)

    Util.assert_instance_of(::String, username) { ":username must be a String, #{username.inspect} given" }
    Util.assert_instance_of(::String, password) { ":password must be a String, #{password.inspect} given" }
    Util.assert_not_empty(username) { ":username cannot be empty" }
    Util.assert_not_empty(password) { ":password cannot be empty" }

    options[:credentials]   = {:username => username, :password => password}
    options[:auth_provider] = Auth::Providers::Password.new(username, password)
  end

  if options.has_key?(:credentials)
    credentials = options[:credentials]

    Util.assert_instance_of(::Hash, credentials) { ":credentials must be a hash, #{credentials.inspect} given" }
  end

  if options.has_key?(:auth_provider)
    auth_provider = options[:auth_provider]

    Util.assert_responds_to(:create_authenticator, auth_provider) { ":auth_provider #{auth_provider.inspect} must respond to :create_authenticator, but doesn't" }
  end

  has_client_cert = options.has_key?(:client_cert)
  has_private_key = options.has_key?(:private_key)

  if has_client_cert || has_private_key
    if has_client_cert && !has_private_key
      raise ::ArgumentError, "both :client_cert and :private_key options must be specified, but only :client_cert given"
    end

    if !has_client_cert && has_private_key
      raise ::ArgumentError, "both :client_cert and :private_key options must be specified, but only :private_key given"
    end

    client_cert = ::File.expand_path(options[:client_cert])
    private_key = ::File.expand_path(options[:private_key])

    Util.assert_file_exists(client_cert) { ":client_cert #{client_cert.inspect} doesn't exist" }
    Util.assert_file_exists(private_key) { ":private_key #{private_key.inspect} doesn't exist" }
  end

  has_server_cert = options.has_key?(:server_cert)

  if has_server_cert
    server_cert = ::File.expand_path(options[:server_cert])

    Util.assert_file_exists(server_cert) { ":server_cert #{server_cert.inspect} doesn't exist" }
  end

  if has_client_cert || has_server_cert
    context = ::OpenSSL::SSL::SSLContext.new

    if has_server_cert
      context.ca_file     = server_cert
      context.verify_mode = ::OpenSSL::SSL::VERIFY_PEER
    end

    if has_client_cert
      context.cert = ::OpenSSL::X509::Certificate.new(File.read(client_cert))

      if options.has_key?(:passphrase)
        context.key = ::OpenSSL::PKey::RSA.new(File.read(private_key), options[:passphrase])
      else
        context.key = ::OpenSSL::PKey::RSA.new(File.read(private_key))
      end
    end

    options[:ssl] = context
  end

  if options.has_key?(:ssl)
    ssl = options[:ssl]

    Util.assert_instance_of_one_of([::TrueClass, ::FalseClass, ::OpenSSL::SSL::SSLContext], ssl) { ":ssl must be a boolean or an OpenSSL::SSL::SSLContext, #{ssl.inspect} given" }
  end

  if options.has_key?(:compression)
    compression = options.delete(:compression)

    case compression
    when :snappy
      require 'cassandra/compression/compressors/snappy'
      options[:compressor] = Compression::Compressors::Snappy.new
    when :lz4
      require 'cassandra/compression/compressors/lz4'
      options[:compressor] = Compression::Compressors::Lz4.new
    else
      raise ::ArgumentError, ":compression must be either :snappy or :lz4, #{compression.inspect} given"
    end
  end

  if options.has_key?(:compressor)
    compressor = options[:compressor]
    methods    = [:algorithm, :compress?, :compress, :decompress]

    Util.assert_responds_to_all(methods, compressor) { ":compressor #{compressor.inspect} must respond to #{methods.inspect}, but doesn't" }
  end

  if options.has_key?(:logger)
    logger  = options[:logger]
    methods = [:debug, :info, :warn, :error, :fatal]

    Util.assert_responds_to_all(methods, logger) { ":logger #{logger.inspect} must respond to #{methods.inspect}, but doesn't" }
  end

  if options.has_key?(:port)
    port = options[:port] = Integer(options[:port])

    Util.assert_one_of(0..65536, port) { ":port must be a valid ip port, #{port} given" }
  end

  if options.has_key?(:datacenter)
    options[:datacenter] = String(options[:datacenter])
  end

  if options.has_key?(:connect_timeout)
    timeout = options[:connect_timeout]

    unless timeout.nil?
      Util.assert_instance_of(::Numeric, timeout) { ":connect_timeout must be a number of seconds, #{timeout} given" }
      Util.assert(timeout > 0) { ":connect_timeout must be greater than 0, #{timeout} given" }
    end
  end

  if options.has_key?(:timeout)
    timeout = options[:timeout]

    unless timeout.nil?
      Util.assert_instance_of(::Numeric, timeout) { ":timeout must be a number of seconds, #{timeout} given" }
      Util.assert(timeout > 0) { ":timeout must be greater than 0, #{timeout} given" }
    end
  end

  if options.has_key?(:heartbeat_interval)
    timeout = options[:heartbeat_interval]

    unless timeout.nil?
      Util.assert_instance_of(::Numeric, timeout) { ":heartbeat_interval must be a number of seconds, #{timeout} given" }
      Util.assert(timeout > 0) { ":heartbeat_interval must be greater than 0, #{timeout} given" }
    end
  end

  if options.has_key?(:idle_timeout)
    timeout = options[:idle_timeout]

    unless timeout.nil?
      Util.assert_instance_of(::Numeric, timeout) { ":idle_timeout must be a number of seconds, #{timeout} given" }
      Util.assert(timeout > 0) { ":idle_timeout must be greater than 0, #{timeout} given" }
    end
  end

  if options.has_key?(:schema_refresh_delay)
    timeout = options[:schema_refresh_delay]

    Util.assert_instance_of(::Numeric, timeout) { ":schema_refresh_delay must be a number of seconds, #{timeout} given" }
    Util.assert(timeout > 0) { ":schema_refresh_delay must be greater than 0, #{timeout} given" }
  end

  if options.has_key?(:schema_refresh_timeout)
    timeout = options[:schema_refresh_timeout]

    Util.assert_instance_of(::Numeric, timeout) { ":schema_refresh_timeout must be a number of seconds, #{timeout} given" }
    Util.assert(timeout > 0) { ":schema_refresh_timeout must be greater than 0, #{timeout} given" }
  end

  if options.has_key?(:load_balancing_policy)
    load_balancing_policy = options[:load_balancing_policy]
    methods = [:host_up, :host_down, :host_found, :host_lost, :setup, :teardown, :distance, :plan]

    Util.assert_responds_to_all(methods, load_balancing_policy) { ":load_balancing_policy #{load_balancing_policy.inspect} must respond to #{methods.inspect}, but doesn't" }
  end

  if options.has_key?(:reconnection_policy)
    reconnection_policy = options[:reconnection_policy]

    Util.assert_responds_to(:schedule, reconnection_policy) { ":reconnection_policy #{reconnection_policy.inspect} must respond to :schedule, but doesn't" }
  end

  if options.has_key?(:retry_policy)
    retry_policy = options[:retry_policy]
    methods = [:read_timeout, :write_timeout, :unavailable]

    Util.assert_responds_to_all(methods, retry_policy) { ":retry_policy #{retry_policy.inspect} must respond to #{methods.inspect}, but doesn't" }
  end

  if options.has_key?(:listeners)
    options[:listeners] = Array(options[:listeners])
  end

  if options.has_key?(:consistency)
    consistency = options[:consistency]

    Util.assert_one_of(CONSISTENCIES, consistency) { ":consistency must be one of #{CONSISTENCIES.inspect}, #{consistency.inspect} given" }
  end

  if options.has_key?(:trace)
    options[:trace] = !!options[:trace]
  end

  if options.has_key?(:shuffle_replicas)
    options[:shuffle_replicas] = !!options[:shuffle_replicas]
  end

  if options.has_key?(:page_size)
    page_size = options[:page_size]

    unless page_size.nil?
      page_size = options[:page_size] = Integer(page_size)
      Util.assert(page_size > 0) { ":page_size must be a positive integer, #{page_size.inspect} given" }
    end
  end

  if options.has_key?(:futures_factory)
    futures_factory = options[:futures_factory]
    methods = [:error, :value, :promise, :all]

    Util.assert_responds_to_all(methods, futures_factory) { ":futures_factory #{futures_factory.inspect} must respond to #{methods.inspect}, but doesn't" }
  end

  if options.has_key?(:address_resolution)
    address_resolution = options.delete(:address_resolution)

    case address_resolution
    when :none
      # do nothing
    when :ec2_multi_region
      options[:address_resolution_policy] = AddressResolution::Policies::EC2MultiRegion.new
    else
      raise ::ArgumentError, ":address_resolution must be either :none or :ec2_multi_region, #{address_resolution.inspect} given"
    end
  end

  if options.has_key?(:address_resolution_policy)
    address_resolver = options[:address_resolution_policy]

    Util.assert_responds_to(:resolve, address_resolver) { ":address_resolution_policy must respond to :resolve, #{address_resolver.inspect} but doesn't" }
  end

  if options.has_key?(:synchronize_schema)
    options[:synchronize_schema] = !!options[:synchronize_schema]
  end

  hosts = []

  Array(options.fetch(:hosts, '127.0.0.1')).each do |host|
    case host
    when ::IPAddr
      hosts << host
    when ::String # ip address or hostname
      Resolv.each_address(host) do |ip|
        hosts << ::IPAddr.new(ip)
      end
    else
      raise ::ArgumentError, ":hosts must be String or IPAddr, #{host.inspect} given"
    end
  end

  if hosts.empty?
    raise ::ArgumentError, ":hosts #{options[:hosts].inspect} could not be resolved to any ip address"
  end

  hosts.shuffle!
rescue => e
  futures = options.fetch(:futures_factory) { Driver.new.futures_factory }
  futures.error(e)
else
  driver = Driver.new(options)
  driver.connect(hosts)
end