Module: Deimos

Includes:
FigTree
Defined in:
lib/deimos.rb,
lib/deimos/logging.rb,
lib/deimos/message.rb,
lib/deimos/version.rb,
lib/deimos/consumer.rb,
lib/deimos/producer.rb,
lib/deimos/poll_info.rb,
lib/deimos/exceptions.rb,
lib/deimos/transcoder.rb,
lib/deimos/kafka_source.rb,
lib/deimos/metrics/mock.rb,
lib/deimos/test_helpers.rb,
lib/deimos/tracing/mock.rb,
lib/deimos/backends/base.rb,
lib/deimos/backends/test.rb,
lib/deimos/kafka_message.rb,
lib/deimos/shared_config.rb,
lib/deimos/backends/kafka.rb,
lib/deimos/backends/outbox.rb,
lib/deimos/metrics/datadog.rb,
lib/deimos/tracing/datadog.rb,
lib/deimos/utils/db_poller.rb,
lib/deimos/ext/schema_route.rb,
lib/deimos/kafka_topic_info.rb,
lib/deimos/metrics/provider.rb,
lib/deimos/tracing/provider.rb,
lib/deimos/schema_class/base.rb,
lib/deimos/schema_class/enum.rb,
lib/deimos/ext/consumer_route.rb,
lib/deimos/ext/producer_route.rb,
lib/deimos/utils/schema_class.rb,
lib/deimos/schema_class/record.rb,
lib/deimos/backends/kafka_async.rb,
lib/deimos/config/configuration.rb,
lib/deimos/schema_backends/base.rb,
lib/deimos/schema_backends/mock.rb,
lib/deimos/utils/db_poller/base.rb,
lib/deimos/utils/deadlock_retry.rb,
lib/deimos/schema_backends/plain.rb,
lib/deimos/utils/outbox_producer.rb,
lib/deimos/active_record_consumer.rb,
lib/deimos/active_record_producer.rb,
lib/deimos/ext/producer_middleware.rb,
lib/generators/deimos/v2_generator.rb,
lib/deimos/consume/batch_consumption.rb,
lib/deimos/schema_backends/avro_base.rb,
lib/deimos/schema_backends/avro_local.rb,
lib/deimos/utils/db_poller/time_based.rb,
lib/deimos/consume/message_consumption.rb,
lib/deimos/utils/db_poller/state_based.rb,
lib/generators/deimos/db_poller_generator.rb,
lib/deimos/schema_backends/avro_validation.rb,
lib/generators/deimos/schema_class_generator.rb,
lib/deimos/active_record_consume/batch_record.rb,
lib/deimos/active_record_consume/batch_slicer.rb,
lib/deimos/active_record_consume/mass_updater.rb,
lib/generators/deimos/active_record_generator.rb,
lib/deimos/schema_backends/avro_schema_coercer.rb,
lib/generators/deimos/bulk_import_id_generator.rb,
lib/generators/deimos/outbox_backend_generator.rb,
lib/deimos/schema_backends/avro_schema_registry.rb,
lib/deimos/active_record_consume/batch_consumption.rb,
lib/deimos/active_record_consume/batch_record_list.rb,
lib/deimos/active_record_consume/message_consumption.rb,
lib/deimos/active_record_consume/schema_model_converter.rb

Overview

Generates a migration for bulk import ID in consumer.

Defined Under Namespace

Modules: ActiveRecordConsume, Backends, Consume, Generators, KafkaSource, Logging, Metrics, ProducerMiddleware, SchemaBackends, SchemaClass, SharedConfig, TestHelpers, Tracing, Utils Classes: ActiveRecordConsumer, ActiveRecordProducer, AvroSchemaCoercer, Consumer, ConsumerRoute, KafkaMessage, KafkaTopicInfo, Message, MissingImplementationError, PollInfo, Producer, ProducerRoute, Railtie, SchemaField, SchemaRoute, Transcoder

Constant Summary collapse

EVENT_TYPES =
%w(
  deimos.ar_consumer.consume_batch
  deimos.encode_message
  deimos.batch_consumption.invalid_records
  deimos.batch_consumption.valid_records
  deimos.outbox.produce
)
VERSION =
'2.0.4'

Class Method Summary collapse

Class Method Details

.decode(schema:, namespace:, payload:) ⇒ Hash?



99
100
101
# File 'lib/deimos.rb', line 99

def decode(schema:, namespace:, payload:)
  self.schema_backend(schema: schema, namespace: namespace).decode(payload)
end

.decode_message(message) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
# File 'lib/deimos.rb', line 104

def decode_message(message)
  topic = message[:topic]
  if Deimos.config.producers.topic_prefix
    topic = topic.sub(Deimos.config.producers.topic_prefix, '')
  end
  config = karafka_config_for(topic: topic)
  message[:payload] = config.deserializers[:payload].decode_message_hash(message[:payload])
  if message[:key] && config.deserializers[:key].respond_to?(:decode_message_hash)
    message[:key] = config.deserializers[:key].decode_message_hash(message[:key])
  end
end

.disable_producers(*producer_classes, &block) ⇒ void

This method returns an undefined value.

Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/deimos/producer.rb', line 16

def disable_producers(*producer_classes, &block)
  if producer_classes.any?
    _disable_producer_classes(producer_classes, &block)
    return
  end

  if Thread.current[:frk_disable_all_producers] # nested disable block
    yield
    return
  end

  begin
    Thread.current[:frk_disable_all_producers] = true
    yield
  ensure
    Thread.current[:frk_disable_all_producers] = false
  end
end

.encode(schema:, namespace:, payload:, subject: nil) ⇒ String



90
91
92
93
# File 'lib/deimos.rb', line 90

def encode(schema:, namespace:, payload:, subject: nil)
  self.schema_backend(schema: schema, namespace: namespace).
    encode(payload, topic: subject || "#{namespace}.#{schema}" )
end

.generate_key_schemasObject



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/deimos/config/configuration.rb', line 23

def generate_key_schemas
  Deimos.karafka_configs.each do |config|
    transcoder = config.deserializers[:key]

    if transcoder.respond_to?(:key_field) && transcoder.key_field
      transcoder.backend = Deimos.schema_backend(schema: config.schema,
                                                 namespace: config.namespace)
      transcoder.backend.generate_key_schema(transcoder.key_field)
    end
  end
end

.karafka_config_for(topic: nil, producer: nil) ⇒ Karafka::Routing::Topic?



164
165
166
167
168
169
170
# File 'lib/deimos.rb', line 164

def karafka_config_for(topic: nil, producer: nil)
  if topic
    karafka_configs.find { |t| t.name == topic}
  elsif producer
    karafka_configs.find { |t| t.producer_classes.include?(producer)}
  end
end

.karafka_configsArray<Karafka::Routing::Topic]



158
159
160
# File 'lib/deimos.rb', line 158

def karafka_configs
  Karafka::App.routes.flat_map(&:topics).flat_map(&:to_a)
end

.load_generated_schema_classesvoid

This method returns an undefined value.

Loads generated classes



37
38
39
40
41
42
43
44
45
# File 'lib/deimos/config/configuration.rb', line 37

def load_generated_schema_classes
  if Deimos.config.schema.generated_class_path.nil?
    raise 'Cannot use schema classes without schema.generated_class_path. Please provide a directory.'
  end

  Dir["./#{Deimos.config.schema.generated_class_path}/**/*.rb"].sort.each { |f| require f }
rescue LoadError
  raise 'Cannot load schema classes. Please regenerate classes with rake deimos:generate_schema_models.'
end

.producers_disabled?(producer_class = nil) ⇒ Boolean

Are producers disabled? If a class is passed in, check only that class. Otherwise check if the global disable flag is set.



52
53
54
55
56
57
# File 'lib/deimos/producer.rb', line 52

def producers_disabled?(producer_class=nil)
  return true if Deimos.config.producers.disabled

  Thread.current[:frk_disable_all_producers] ||
    Thread.current[:frk_disabled_producers]&.include?(producer_class)
end

.schema_backend(schema:, namespace:) ⇒ Deimos::SchemaBackends::Base



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/deimos.rb', line 68

def schema_backend(schema:, namespace:)
  if config.schema.use_schema_classes
    # Initialize an instance of the provided schema
    # in the event the schema class is an override, the inherited
    # schema and namespace will be applied
    schema_class = Utils::SchemaClass.klass(schema, namespace)
    if schema_class.nil?
      schema_backend_class.new(schema: schema, namespace: namespace)
    else
      schema_instance = schema_class.new
      schema_backend_class.new(schema: schema_instance.schema, namespace: schema_instance.namespace)
    end
  else
    schema_backend_class.new(schema: schema, namespace: namespace)
  end
end

.schema_backend_classClass<Deimos::SchemaBackends::Base>



57
58
59
60
61
62
63
# File 'lib/deimos.rb', line 57

def schema_backend_class
  backend = Deimos.config.schema.backend.to_s

  require "deimos/schema_backends/#{backend}"

  "Deimos::SchemaBackends::#{backend.classify}".constantize
end

.setup_karafkaObject



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/deimos.rb', line 140

def setup_karafka
  Karafka.producer.middleware.append(Deimos::ProducerMiddleware)
  # for multiple setup calls
  Karafka.producer.config.kafka =
    Karafka::Setup::AttributesMap.producer(Karafka::Setup::Config.config.kafka.dup)
  EVENT_TYPES.each { |type| Karafka.monitor.notifications_bus.register_event(type) }

  Karafka.producer.monitor.subscribe('error.occurred') do |event|
    if event.payload.key?(:messages)
      topic = event[:messages].first[:topic]
      config = Deimos.karafka_config_for(topic: topic)
      message = Deimos::Logging.messages_log_text(config&.payload_log, event[:messages])
      Karafka.logger.error("Error producing messages: #{event[:error].message} #{message.to_json}")
    end
  end
end

.start_outbox_backend!(thread_count: 1) ⇒ void

This method returns an undefined value.

Start the DB producers to send Kafka messages.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/deimos.rb', line 119

def start_outbox_backend!(thread_count: 1)
  Sigurd.exit_on_signal = true
  if self.config.producers.backend != :outbox
    raise('Publish backend is not set to :outbox, exiting')
  end

  if thread_count.nil? || thread_count.zero?
    raise('Thread count is not given or set to zero, exiting')
  end

  producers = (1..thread_count).map do
    Deimos::Utils::OutboxProducer.
      new(self.config.outbox.logger || Karafka.logger)
  end
  executor = Sigurd::Executor.new(producers,
                                  sleep_seconds: 5,
                                  logger: Karafka.logger)
  signal_handler = Sigurd::SignalHandler.new(executor)
  signal_handler.run!
end

.topic_for_consumer(handler_class) ⇒ String?



174
175
176
177
178
179
180
181
# File 'lib/deimos.rb', line 174

def topic_for_consumer(handler_class)
  Deimos.karafka_configs.each do |topic|
    if topic.consumer == handler_class
      return topic.name
    end
  end
  nil
end