Module: Deimos
- Includes:
- Instrumentation, FigTree
- Defined in:
- lib/deimos.rb,
lib/deimos/message.rb,
lib/deimos/version.rb,
lib/deimos/consumer.rb,
lib/deimos/producer.rb,
lib/deimos/poll_info.rb,
lib/deimos/exceptions.rb,
lib/deimos/backends/db.rb,
lib/deimos/kafka_source.rb,
lib/deimos/metrics/mock.rb,
lib/deimos/test_helpers.rb,
lib/deimos/tracing/mock.rb,
lib/deimos/backends/base.rb,
lib/deimos/backends/test.rb,
lib/deimos/kafka_message.rb,
lib/deimos/shared_config.rb,
lib/deimos/backends/kafka.rb,
lib/deimos/batch_consumer.rb,
lib/deimos/instrumentation.rb,
lib/deimos/metrics/datadog.rb,
lib/deimos/tracing/datadog.rb,
lib/deimos/utils/db_poller.rb,
lib/deimos/kafka_topic_info.rb,
lib/deimos/metrics/provider.rb,
lib/deimos/tracing/provider.rb,
lib/deimos/schema_class/base.rb,
lib/deimos/schema_class/enum.rb,
lib/deimos/utils/db_producer.rb,
lib/deimos/utils/lag_reporter.rb,
lib/deimos/utils/schema_class.rb,
lib/deimos/schema_class/record.rb,
lib/deimos/backends/kafka_async.rb,
lib/deimos/config/configuration.rb,
lib/deimos/config/phobos_config.rb,
lib/deimos/schema_backends/base.rb,
lib/deimos/schema_backends/mock.rb,
lib/deimos/utils/db_poller/base.rb,
lib/deimos/utils/deadlock_retry.rb,
lib/deimos/utils/inline_consumer.rb,
lib/deimos/active_record_consumer.rb,
lib/deimos/active_record_producer.rb,
lib/deimos/consume/batch_consumption.rb,
lib/deimos/schema_backends/avro_base.rb,
lib/deimos/schema_backends/avro_local.rb,
lib/deimos/utils/db_poller/time_based.rb,
lib/deimos/consume/message_consumption.rb,
lib/deimos/utils/db_poller/state_based.rb,
lib/deimos/utils/schema_controller_mixin.rb,
lib/generators/deimos/db_poller_generator.rb,
lib/deimos/schema_backends/avro_validation.rb,
lib/generators/deimos/db_backend_generator.rb,
lib/generators/deimos/schema_class_generator.rb,
lib/deimos/active_record_consume/batch_record.rb,
lib/deimos/active_record_consume/batch_slicer.rb,
lib/deimos/active_record_consume/mass_updater.rb,
lib/generators/deimos/active_record_generator.rb,
lib/deimos/schema_backends/avro_schema_coercer.rb,
lib/generators/deimos/bulk_import_id_generator.rb,
lib/deimos/schema_backends/avro_schema_registry.rb,
lib/deimos/active_record_consume/batch_consumption.rb,
lib/deimos/active_record_consume/batch_record_list.rb,
lib/deimos/active_record_consume/message_consumption.rb,
lib/deimos/active_record_consume/schema_model_converter.rb
Overview
Generates a migration for bulk import ID in consumer.
Defined Under Namespace
Modules: ActiveRecordConsume, Backends, Consume, Generators, Instrumentation, KafkaListener, KafkaSource, Metrics, PhobosConfig, SchemaBackends, SchemaClass, SharedConfig, TestHelpers, Tracing, Utils Classes: ActiveRecordConsumer, ActiveRecordProducer, AvroSchemaCoercer, BatchConsumer, Consumer, KafkaMessage, KafkaTopicInfo, Message, MissingImplementationError, PollInfo, Producer, Railtie, SchemaField
Constant Summary collapse
- VERSION =
'1.23.2'
Constants included from Instrumentation
Class Method Summary collapse
- .decode(schema:, namespace:, payload:) ⇒ Hash?
-
.disable_producers(*producer_classes, &block) ⇒ void
Run a block without allowing any messages to be produced to Kafka.
- .encode(schema:, namespace:, payload:, subject: nil) ⇒ String
-
.load_generated_schema_classes ⇒ void
Loads generated classes.
-
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class.
- .schema_backend(schema:, namespace:) ⇒ Deimos::SchemaBackends::Base
- .schema_backend_class ⇒ Class<Deimos::SchemaBackends::Base>
-
.start_db_backend!(thread_count: 1) ⇒ void
Start the DB producers to send Kafka messages.
Class Method Details
.decode(schema:, namespace:, payload:) ⇒ Hash?
90 91 92 |
# File 'lib/deimos.rb', line 90 def decode(schema:, namespace:, payload:) self.schema_backend(schema: schema, namespace: namespace).decode(payload) end |
.disable_producers(*producer_classes, &block) ⇒ void
This method returns an undefined value.
Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/deimos/producer.rb', line 16 def disable_producers(*producer_classes, &block) if producer_classes.any? _disable_producer_classes(producer_classes, &block) return end if Thread.current[:frk_disable_all_producers] # nested disable block yield return end begin Thread.current[:frk_disable_all_producers] = true yield ensure Thread.current[:frk_disable_all_producers] = false end end |
.encode(schema:, namespace:, payload:, subject: nil) ⇒ String
81 82 83 84 |
# File 'lib/deimos.rb', line 81 def encode(schema:, namespace:, payload:, subject: nil) self.schema_backend(schema: schema, namespace: namespace). encode(payload, topic: subject || "#{namespace}.#{schema}" ) end |
.load_generated_schema_classes ⇒ void
This method returns an undefined value.
Loads generated classes
36 37 38 39 40 41 42 43 44 |
# File 'lib/deimos/config/configuration.rb', line 36 def self.load_generated_schema_classes if Deimos.config.schema.generated_class_path.nil? raise 'Cannot use schema classes without schema.generated_class_path. Please provide a directory.' end Dir["./#{Deimos.config.schema.generated_class_path}/**/*.rb"].sort.each { |f| require f } rescue LoadError raise 'Cannot load schema classes. Please regenerate classes with rake deimos:generate_schema_models.' end |
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class. Otherwise check if the global disable flag is set.
52 53 54 55 |
# File 'lib/deimos/producer.rb', line 52 def producers_disabled?(producer_class=nil) Thread.current[:frk_disable_all_producers] || Thread.current[:frk_disabled_producers]&.include?(producer_class) end |
.schema_backend(schema:, namespace:) ⇒ Deimos::SchemaBackends::Base
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/deimos.rb', line 59 def schema_backend(schema:, namespace:) if Utils::SchemaClass.use?(config.to_h) # Initialize an instance of the provided schema # in the event the schema class is an override, the inherited # schema and namespace will be applied schema_class = Utils::SchemaClass.klass(schema, namespace) if schema_class.nil? schema_backend_class.new(schema: schema, namespace: namespace) else schema_instance = schema_class.new schema_backend_class.new(schema: schema_instance.schema, namespace: schema_instance.namespace) end else schema_backend_class.new(schema: schema, namespace: namespace) end end |
.schema_backend_class ⇒ Class<Deimos::SchemaBackends::Base>
48 49 50 51 52 53 54 |
# File 'lib/deimos.rb', line 48 def schema_backend_class backend = Deimos.config.schema.backend.to_s require "deimos/schema_backends/#{backend}" "Deimos::SchemaBackends::#{backend.classify}".constantize end |
.start_db_backend!(thread_count: 1) ⇒ void
This method returns an undefined value.
Start the DB producers to send Kafka messages.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/deimos.rb', line 97 def start_db_backend!(thread_count: 1) Sigurd.exit_on_signal = true if self.config.producers.backend != :db raise('Publish backend is not set to :db, exiting') end if thread_count.nil? || thread_count.zero? raise('Thread count is not given or set to zero, exiting') end producers = (1..thread_count).map do Deimos::Utils::DbProducer. new(self.config.db_producer.logger || self.config.logger) end executor = Sigurd::Executor.new(producers, sleep_seconds: 5, logger: self.config.logger) signal_handler = Sigurd::SignalHandler.new(executor) signal_handler.run! end |