Module: LogStash::PluginMixins::Kafka::Common
- Included in:
- Inputs::Kafka, Outputs::Kafka
- Defined in:
- lib/logstash/plugin_mixins/kafka/common.rb
Class Method Summary collapse
Instance Method Summary collapse
- #reassign_dns_lookup ⇒ Object
- #set_sasl_config(props) ⇒ Object
- #set_trustore_keystore_config(props) ⇒ Object
Class Method Details
.included(base) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/logstash/plugin_mixins/kafka/common.rb', line 4 def self.included(base) # COMMON CONFIGURATION SUPPORTED BY BOTH PRODUCER/CONSUMER # Close idle connections after the number of milliseconds specified by this config. base.config :connections_max_idle_ms, :validate => :number, :default => 540_000 # (9m) Kafka default # The period of time in milliseconds after which we force a refresh of metadata even if # we haven't seen any partition leadership changes to proactively discover any new brokers or partitions base.config :metadata_max_age_ms, :validate => :number, :default => 300_000 # (5m) Kafka default # The configuration controls the maximum amount of time the client will wait for the response of a request. # If the response is not received before the timeout elapses the client will resend the request if necessary # or fail the request if retries are exhausted. base.config :request_timeout_ms, :validate => :number, :default => 40_000 # Kafka default end |
Instance Method Details
#reassign_dns_lookup ⇒ Object
47 48 49 50 51 52 53 |
# File 'lib/logstash/plugin_mixins/kafka/common.rb', line 47 def reassign_dns_lookup if @client_dns_lookup == "default" @client_dns_lookup = "use_all_dns_ips" logger.warn("client_dns_lookup setting 'default' value is deprecated, forced to 'use_all_dns_ips', please update your configuration") deprecation_logger.deprecated("Deprecated value `default` for `client_dns_lookup` option; use `use_all_dns_ips` instead.") end end |
#set_sasl_config(props) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/logstash/plugin_mixins/kafka/common.rb', line 33 def set_sasl_config(props) java.lang.System.setProperty("java.security.auth.login.config", jaas_path) unless jaas_path.nil? java.lang.System.setProperty("java.security.krb5.conf", kerberos_config) unless kerberos_config.nil? props.put("sasl.mechanism", sasl_mechanism) if sasl_mechanism == "GSSAPI" && sasl_kerberos_service_name.nil? raise LogStash::ConfigurationError, "sasl_kerberos_service_name must be specified when SASL mechanism is GSSAPI" end props.put("sasl.kerberos.service.name", sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil? props.put("sasl.jaas.config", sasl_jaas_config) unless sasl_jaas_config.nil? props.put("sasl.client.callback.handler.class", sasl_client_callback_handler_class) unless sasl_client_callback_handler_class.nil? end |
#set_trustore_keystore_config(props) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/logstash/plugin_mixins/kafka/common.rb', line 20 def set_trustore_keystore_config(props) props.put("ssl.truststore.type", ssl_truststore_type) unless ssl_truststore_type.nil? props.put("ssl.truststore.location", ssl_truststore_location) unless ssl_truststore_location.nil? props.put("ssl.truststore.password", ssl_truststore_password.value) unless ssl_truststore_password.nil? # Client auth stuff props.put("ssl.keystore.type", ssl_keystore_type) unless ssl_keystore_type.nil? props.put("ssl.key.password", ssl_key_password.value) unless ssl_key_password.nil? props.put("ssl.keystore.location", ssl_keystore_location) unless ssl_keystore_location.nil? props.put("ssl.keystore.password", ssl_keystore_password.value) unless ssl_keystore_password.nil? props.put("ssl.endpoint.identification.algorithm", ssl_endpoint_identification_algorithm) unless ssl_endpoint_identification_algorithm.nil? end |