Class: BPS::Publisher::Kafka

Inherits:
Abstract
  • Object
show all
Defined in:
lib/bps/publisher/kafka.rb

Direct Known Subclasses

KafkaAsync

Defined Under Namespace

Classes: Topic

Constant Summary collapse

CLIENT_OPTS =
{
  client_id: :string,
  connect_timeout: :float,
  socket_timeout: :float,
  ssl_ca_cert_file_path: :string,
  ssl_ca_cert: :string,
  ssl_client_cert: :string,
  ssl_client_cert_key: :string,
  ssl_client_cert_key_password: :string,
  ssl_client_cert_chain: :string,
  sasl_gssapi_principal: :string,
  sasl_gssapi_keytab: :string,
  sasl_plain_authzid: :string,
  sasl_plain_username: :string,
  sasl_plain_password: :string,
  sasl_scram_username: :string,
  sasl_scram_password: :string,
  sasl_scram_mechanism: :string,
  sasl_over_ssl: :bool,
  ssl_ca_certs_from_system: :bool,
  ssl_verify_hostname: :bool,
}.freeze
PRODUCER_OPTS =
{
  # standard
  retry_backoff: :float,
  compression_codec: :symbol,
  compression_threshold: :int,
  ack_timeout: :float,
  required_acks: :symbol,
  max_retries: :int,
  max_buffer_size: :int,
  max_buffer_bytesize: :int,
  idempotent: :bool,
  transactional: :bool,
  transactional_id: :string,
  transactional_timeout: :bool,
  # async
  delivery_interval: :float,
  delivery_threshold: :int,
  max_queue_size: :int,
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(broker_addrs, **opts) ⇒ Kafka

Returns a new instance of Kafka.

Parameters:

  • brokers (Array<String>, URI)

    the seed broker addresses.

  • opts (Hash)

    the options.

See Also:



82
83
84
85
86
87
88
89
# File 'lib/bps/publisher/kafka.rb', line 82

def initialize(broker_addrs, **opts)
  super()

  broker_addrs = parse_url(broker_addrs) if broker_addrs.is_a?(URI)
  @topics = {}
  @client = ::Kafka.new(broker_addrs, **opts.slice(*CLIENT_OPTS.keys))
  @producer = init_producer(**opts.slice(*PRODUCER_OPTS.keys))
end

Class Method Details

.coercerBPS::Coercer

Returns the options coercer.

Returns:

  • (BPS::Coercer)

    the options coercer.



75
76
77
# File 'lib/bps/publisher/kafka.rb', line 75

def self.coercer
  @coercer ||= BPS::Coercer.new(CLIENT_OPTS.merge(PRODUCER_OPTS)).freeze
end

Instance Method Details

#closeObject



95
96
97
98
99
100
# File 'lib/bps/publisher/kafka.rb', line 95

def close
  super

  @producer.shutdown
  @client.close
end

#topic(name) ⇒ Object



91
92
93
# File 'lib/bps/publisher/kafka.rb', line 91

def topic(name)
  @topics[name] ||= self.class::Topic.new(@producer, name)
end