Method: Kafka::Client#initialize
- Defined in:
- lib/kafka/client.rb
#initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil, ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, sasl_aws_msk_iam_access_key_id: nil, sasl_aws_msk_iam_secret_key_id: nil, sasl_aws_msk_iam_aws_region: nil, sasl_aws_msk_iam_session_token: nil, sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true, resolve_seed_brokers: false) ⇒ Client
Initializes a new Kafka client.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/kafka/client.rb', line 83 def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil, ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_client_cert_key_password: nil, ssl_client_cert_chain: nil, sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil, sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil, sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, sasl_aws_msk_iam_access_key_id: nil, sasl_aws_msk_iam_secret_key_id: nil, sasl_aws_msk_iam_aws_region: nil, sasl_aws_msk_iam_session_token: nil, sasl_over_ssl: true, ssl_ca_certs_from_system: false, partitioner: nil, sasl_oauth_token_provider: nil, ssl_verify_hostname: true, resolve_seed_brokers: false) @logger = TaggedLogger.new(logger) @instrumenter = Instrumenter.new(client_id: client_id) @seed_brokers = normalize_seed_brokers(seed_brokers) @resolve_seed_brokers = resolve_seed_brokers ssl_context = SslContext.build( ca_cert_file_path: ssl_ca_cert_file_path, ca_cert: ssl_ca_cert, client_cert: ssl_client_cert, client_cert_key: ssl_client_cert_key, client_cert_key_password: ssl_client_cert_key_password, client_cert_chain: ssl_client_cert_chain, ca_certs_from_system: ssl_ca_certs_from_system, verify_hostname: ssl_verify_hostname ) sasl_authenticator = SaslAuthenticator.new( sasl_gssapi_principal: sasl_gssapi_principal, sasl_gssapi_keytab: sasl_gssapi_keytab, sasl_plain_authzid: sasl_plain_authzid, sasl_plain_username: sasl_plain_username, sasl_plain_password: sasl_plain_password, sasl_scram_username: sasl_scram_username, sasl_scram_password: sasl_scram_password, sasl_scram_mechanism: sasl_scram_mechanism, sasl_aws_msk_iam_access_key_id: sasl_aws_msk_iam_access_key_id, sasl_aws_msk_iam_secret_key_id: sasl_aws_msk_iam_secret_key_id, sasl_aws_msk_iam_aws_region: sasl_aws_msk_iam_aws_region, sasl_aws_msk_iam_session_token: sasl_aws_msk_iam_session_token, sasl_oauth_token_provider: sasl_oauth_token_provider, logger: @logger ) if sasl_authenticator.enabled? && sasl_over_ssl && ssl_context.nil? raise ArgumentError, "SASL authentication requires that SSL is configured" end @connection_builder = ConnectionBuilder.new( client_id: client_id, connect_timeout: connect_timeout, socket_timeout: socket_timeout, ssl_context: ssl_context, logger: @logger, instrumenter: @instrumenter, sasl_authenticator: sasl_authenticator ) @cluster = initialize_cluster @partitioner = partitioner || Partitioner.new end |