Class: Dionysus::Producer
- Inherits:
-
Object
- Object
- Dionysus::Producer
show all
- Defined in:
- lib/dionysus/producer.rb
Defined Under Namespace
Classes: BaseResponder, Config, DeletedRecordSerializer, Genesis, KarafkaResponderGenerator, Key, ModelSerializer, Outbox, PartitionKey, Registry, Serializer, Suppressor
Class Method Summary
collapse
Class Method Details
.configuration ⇒ Object
4
5
6
|
# File 'lib/dionysus/producer.rb', line 4
def self.configuration
@configuration ||= Dionysus::Producer::Config.new
end
|
8
9
10
|
# File 'lib/dionysus/producer.rb', line 8
def self.configure
yield configuration
end
|
.declare(&config) ⇒ Object
16
17
18
19
20
21
22
23
|
# File 'lib/dionysus/producer.rb', line 16
def self.declare(&config)
registry = Dionysus::Producer::Registry.new
registry.instance_eval(&config)
configure do |configuration|
configuration.registry = registry
end
end
|
.observers_with_responders_for(resource, changeset) ⇒ Object
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
# File 'lib/dionysus/producer.rb', line 101
def self.observers_with_responders_for(resource, changeset)
return [] if registry.nil?
registry.registrations.values.each.with_object([]) do |registration, accum|
registration.topics.each do |topic|
topic
.models
.select { |model_registration| model_registration.observes?(resource, changeset) }
.each do |model_registration|
association_name = model_registration.association_name_for_observable(resource, changeset)
methods_chain = association_name.to_s.split(".")
association_or_associations = methods_chain.inject(resource) do |record, method_name|
record.public_send(method_name)
end
accum << [Array.wrap(association_or_associations).compact, topic.producer]
end
end
end
end
|
.outbox ⇒ Object
25
26
27
|
# File 'lib/dionysus/producer.rb', line 25
def self.outbox
Dionysus::Producer::Outbox.new(configuration.outbox_model, config: configuration)
end
|
.outbox_publisher ⇒ Object
.registry ⇒ Object
12
13
14
|
# File 'lib/dionysus/producer.rb', line 12
def self.registry
configuration.registry
end
|
.reset! ⇒ Object
33
34
35
36
37
38
39
40
|
# File 'lib/dionysus/producer.rb', line 33
def self.reset!
return if registry.nil?
registry.registrations.values.flat_map(&:producers).each do |producer_class|
Dionysus.send(:remove_const, producer_class.name.demodulize.to_sym) if producer_class.name
end
@configuration = Dionysus::Producer::Config.new
end
|
.responders_for(model_klass) ⇒ Object
42
43
44
45
46
47
48
49
50
|
# File 'lib/dionysus/producer.rb', line 42
def self.responders_for(model_klass)
return [] if registry.nil?
registry.registrations.each.with_object([]) do |(_, registration), responders|
registration.producers.select { |producer| producer.publisher_of?(model_klass) }.each do |producer|
responders << producer
end
end
end
|
.responders_for_dependency_parent(model_klass) ⇒ Object
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/dionysus/producer.rb', line 56
def self.responders_for_dependency_parent(model_klass)
return [] if registry.nil?
registry.registrations.values.each.with_object([]) do |registration, accum|
registration.topics.each do |topic|
topic
.models
.select { |model_registration| model_registration.options[:with].to_a.include?(model_klass) }
.each do |model_registration|
accum << [model_registration.model_klass, topic.producer]
end
end
end
end
|
.responders_for_dependency_parent_for_topic(model_klass, topic) ⇒ Object
71
72
73
74
75
|
# File 'lib/dionysus/producer.rb', line 71
def self.responders_for_dependency_parent_for_topic(model_klass, topic)
responders_for_dependency_parent(model_klass).select do |_model, responder|
responder.publisher_for_topic?(topic)
end
end
|
.responders_for_model_for_topic(model_klass, topic) ⇒ Object
52
53
54
|
# File 'lib/dionysus/producer.rb', line 52
def self.responders_for_model_for_topic(model_klass, topic)
responders_for(model_klass).select { |responder| responder.publisher_of_model_for_topic?(model_klass, topic) }
end
|
.start_outbox_worker(threads_number:) ⇒ Object
77
78
79
80
81
82
83
84
|
# File 'lib/dionysus/producer.rb', line 77
def self.start_outbox_worker(threads_number:)
runners = (1..threads_number).map do
Dionysus::Producer::Outbox::Runner.new(config: configuration)
end
executor = Sigurd::Executor.new(runners, sleep_seconds: 5, logger: Dionysus.logger)
signal_handler = Sigurd::SignalHandler.new(executor)
signal_handler.run!
end
|
.topics_models_mapping ⇒ Object
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
# File 'lib/dionysus/producer.rb', line 86
def self.topics_models_mapping
return {} if registry.nil?
registry
.registrations
.values
.flat_map(&:topics)
.to_h do |topic|
[
topic.to_s,
topic.models.to_h { |registration| [registration.model_klass, registration.options.fetch(:with, [])] }
]
end
end
|