Module: Deimos::PhobosConfig
- Extended by:
- ActiveSupport::Concern
- Included in:
- FigTree::ConfigStruct
- Defined in:
- lib/deimos/config/phobos_config.rb
Overview
Module to handle phobos.yml as well as outputting the configuration to save to Phobos itself.
Instance Method Summary collapse
-
#phobos_config ⇒ Hash
Create a hash representing the config that Phobos expects.
- #reset! ⇒ void
- #ssl_var_contents(key) ⇒ String
- #to_h ⇒ Hash
Instance Method Details
#phobos_config ⇒ Hash
Create a hash representing the config that Phobos expects.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/deimos/config/phobos_config.rb', line 31 def phobos_config p_config = { logger: Logger.new(STDOUT), custom_logger: self.phobos_logger, custom_kafka_logger: self.kafka.logger, kafka: { client_id: self.kafka.client_id, connect_timeout: self.kafka.connect_timeout, socket_timeout: self.kafka.socket_timeout, ssl_verify_hostname: self.kafka.ssl.verify_hostname, ssl_ca_certs_from_system: self.kafka.ssl.ca_certs_from_system, seed_brokers: Array.wrap(self.kafka.seed_brokers) }, producer: { ack_timeout: self.producers.ack_timeout, required_acks: self.producers.required_acks, max_retries: self.producers.max_retries, retry_backoff: self.producers.retry_backoff, max_buffer_size: self.producers.max_buffer_size, max_buffer_bytesize: self.producers.max_buffer_bytesize, compression_codec: self.producers.compression_codec, compression_threshold: self.producers.compression_threshold, max_queue_size: self.producers.max_queue_size, delivery_threshold: self.producers.delivery_threshold, delivery_interval: self.producers.delivery_interval }, consumer: { session_timeout: self.consumers.session_timeout, offset_commit_interval: self.consumers.offset_commit_interval, offset_commit_threshold: self.consumers.offset_commit_threshold, heartbeat_interval: self.consumers.heartbeat_interval }, backoff: _backoff(self.consumers.backoff.to_a) } p_config[:listeners] = self.consumer_objects.map do |consumer| next nil if consumer.disabled hash = consumer.to_h.reject do |k, _| %i(class_name schema namespace key_config backoff disabled replace_associations bulk_import_id_column).include?(k) end hash = hash.map { |k, v| [k, v.is_a?(Symbol) ? v.to_s : v] }.to_h hash[:handler] = consumer.class_name if consumer.backoff hash[:backoff] = _backoff(consumer.backoff.to_a) end hash end p_config[:listeners].compact! if self.kafka.ssl.enabled %w(ca_cert client_cert client_cert_key).each do |key| next if self.kafka.ssl.send(key).blank? p_config[:kafka]["ssl_#{key}".to_sym] = ssl_var_contents(self.kafka.ssl.send(key)) end end if self.kafka.sasl.enabled p_config[:kafka][:sasl_over_ssl] = self.kafka.sasl.enforce_ssl %w( gssapi_principal gssapi_keytab plain_authzid plain_username plain_password scram_username scram_password scram_mechanism oauth_token_provider ).each do |key| value = self.kafka.sasl.send(key) next if value.blank? p_config[:kafka]["sasl_#{key}".to_sym] = value end end p_config end |
#reset! ⇒ void
This method returns an undefined value.
24 25 26 27 |
# File 'lib/deimos/config/phobos_config.rb', line 24 def reset! super Phobos.configure(self.phobos_config) end |
#ssl_var_contents(key) ⇒ String
114 115 116 |
# File 'lib/deimos/config/phobos_config.rb', line 114 def ssl_var_contents(key) File.exist?(key) ? File.read(key) : key end |
#to_h ⇒ Hash
12 13 14 15 16 17 18 19 20 21 |
# File 'lib/deimos/config/phobos_config.rb', line 12 def to_h (FIELDS + [:handler]).map { |f| val = self.send(f) if f == :backoff && val [:backoff, _backoff(val)] elsif val.present? [f, val] end }.to_h end |