Class: Dionysus::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/dionysus/producer.rb

Defined Under Namespace

Classes: BaseResponder, Config, DeletedRecordSerializer, Genesis, KarafkaResponderGenerator, Key, ModelSerializer, Outbox, PartitionKey, Registry, Serializer, Suppressor

Class Method Summary collapse

Class Method Details

.configurationObject



4
5
6
# File 'lib/dionysus/producer.rb', line 4

def self.configuration
  @configuration ||= Dionysus::Producer::Config.new
end

.configure {|configuration| ... } ⇒ Object

Yields:



8
9
10
# File 'lib/dionysus/producer.rb', line 8

def self.configure
  yield configuration
end

.declare(&config) ⇒ Object



16
17
18
19
20
21
22
23
# File 'lib/dionysus/producer.rb', line 16

def self.declare(&config)
  registry = Dionysus::Producer::Registry.new

  registry.instance_eval(&config)
  configure do |configuration|
    configuration.registry = registry
  end
end

.observers_with_responders_for(resource, changeset) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/dionysus/producer.rb', line 101

def self.observers_with_responders_for(resource, changeset)
  return [] if registry.nil?

  registry.registrations.values.each.with_object([]) do |registration, accum|
    registration.topics.each do |topic|
      topic
        .models
        .select { |model_registration| model_registration.observes?(resource, changeset) }
        .each do |model_registration|
          association_name = model_registration.association_name_for_observable(resource, changeset)
          methods_chain = association_name.to_s.split(".")
          association_or_associations = methods_chain.inject(resource) do |record, method_name|
            record.public_send(method_name)
          end

          accum << [Array.wrap(association_or_associations).compact, topic.producer]
        end
    end
  end
end

.outboxObject



25
26
27
# File 'lib/dionysus/producer.rb', line 25

def self.outbox
  Dionysus::Producer::Outbox.new(configuration.outbox_model, config: configuration)
end

.outbox_publisherObject



29
30
31
# File 'lib/dionysus/producer.rb', line 29

def self.outbox_publisher
  Dionysus::Producer::Outbox::Publisher.new(config: configuration)
end

.registryObject



12
13
14
# File 'lib/dionysus/producer.rb', line 12

def self.registry
  configuration.registry
end

.reset!Object



33
34
35
36
37
38
39
40
# File 'lib/dionysus/producer.rb', line 33

def self.reset!
  return if registry.nil?

  registry.registrations.values.flat_map(&:producers).each do |producer_class|
    Dionysus.send(:remove_const, producer_class.name.demodulize.to_sym) if producer_class.name
  end
  @configuration = Dionysus::Producer::Config.new
end

.responders_for(model_klass) ⇒ Object



42
43
44
45
46
47
48
49
50
# File 'lib/dionysus/producer.rb', line 42

def self.responders_for(model_klass)
  return [] if registry.nil?

  registry.registrations.each.with_object([]) do |(_, registration), responders|
    registration.producers.select { |producer| producer.publisher_of?(model_klass) }.each do |producer|
      responders << producer
    end
  end
end

.responders_for_dependency_parent(model_klass) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/dionysus/producer.rb', line 56

def self.responders_for_dependency_parent(model_klass)
  return [] if registry.nil?

  registry.registrations.values.each.with_object([]) do |registration, accum|
    registration.topics.each do |topic|
      topic
        .models
        .select { |model_registration| model_registration.options[:with].to_a.include?(model_klass) }
        .each do |model_registration|
        accum << [model_registration.model_klass, topic.producer]
      end
    end
  end
end

.responders_for_dependency_parent_for_topic(model_klass, topic) ⇒ Object



71
72
73
74
75
# File 'lib/dionysus/producer.rb', line 71

def self.responders_for_dependency_parent_for_topic(model_klass, topic)
  responders_for_dependency_parent(model_klass).select do |_model, responder|
    responder.publisher_for_topic?(topic)
  end
end

.responders_for_model_for_topic(model_klass, topic) ⇒ Object



52
53
54
# File 'lib/dionysus/producer.rb', line 52

def self.responders_for_model_for_topic(model_klass, topic)
  responders_for(model_klass).select { |responder| responder.publisher_of_model_for_topic?(model_klass, topic) }
end

.start_outbox_worker(threads_number:) ⇒ Object



77
78
79
80
81
82
83
84
# File 'lib/dionysus/producer.rb', line 77

def self.start_outbox_worker(threads_number:)
  runners = (1..threads_number).map do
    Dionysus::Producer::Outbox::Runner.new(config: configuration)
  end
  executor = Sigurd::Executor.new(runners, sleep_seconds: 5, logger: Dionysus.logger)
  signal_handler = Sigurd::SignalHandler.new(executor)
  signal_handler.run!
end

.topics_models_mappingObject



86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/dionysus/producer.rb', line 86

def self.topics_models_mapping
  return {} if registry.nil?

  registry
    .registrations
    .values
    .flat_map(&:topics)
    .to_h do |topic|
      [
        topic.to_s,
        topic.models.to_h { |registration| [registration.model_klass, registration.options.fetch(:with, [])] }
      ]
    end
end