Module: Dionysus

Defined in:
lib/dionysus.rb,
lib/dionysus/checks.rb,
lib/dionysus/version.rb

Defined Under Namespace

Modules: Rb, Utils, Version Classes: Checks, Consumer, Monitor, Producer, Railtie, TopicName

Constant Summary collapse

CONSUMER_GROUP_PREFIX =
"dionysus_consumer_group_for"
VERSION =
"0.5.0"

Class Method Summary collapse

Class Method Details

.consumer_registryObject



101
102
103
# File 'lib/dionysus.rb', line 101

def self.consumer_registry
  @consumer_registry
end

.enable_outbox_worker_healthcheckObject



87
88
89
90
91
# File 'lib/dionysus.rb', line 87

def self.enable_outbox_worker_healthcheck
  monitor.subscribe("outbox_producer.started") { outbox_worker_health_check.register_heartbeat }
  monitor.subscribe("outbox_producer.stopped") { outbox_worker_health_check.worker_stopped }
  monitor.subscribe("outbox_producer.heartbeat") { outbox_worker_health_check.register_heartbeat }
end

.health_checkObject



79
80
81
# File 'lib/dionysus.rb', line 79

def self.health_check
  @health_check
end

.health_check=(health_check) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/dionysus.rb', line 51

def self.health_check=(health_check)
  @health_check = health_check

  Karafka.monitor.subscribe("app.initialized") do |_event|
    health_check = Dionysus.health_check

    health_check&.app_initialized!
  end

  Karafka.monitor.subscribe("statistics.emitted") do |_event|
    health_check = Dionysus.health_check

    health_check&.register_heartbeat
  end

  Karafka.monitor.subscribe("consumer.consumed") do |_event|
    health_check = Dionysus.health_check

    health_check&.register_heartbeat
  end

  Karafka.monitor.subscribe("app.stopped") do |_event|
    health_check = Dionysus.health_check

    health_check&.app_stopped!
  end
end

.initialize_application!(environment:, seed_brokers:, client_id:, logger:, draw_routing: true, consumer_group_prefix: CONSUMER_GROUP_PREFIX) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/dionysus.rb', line 23

def self.initialize_application!(environment:, seed_brokers:, client_id:, logger:, draw_routing: true, consumer_group_prefix: CONSUMER_GROUP_PREFIX)
  ENV["KARAFKA_ENV"] = environment

  karafka_app = Class.new(Karafka::App) do
    setup do |config|
      config.kafka = {
        "bootstrap.servers": seed_brokers.join(","),
        "client.id": client_id
      }
      config.client_id = client_id
      config.logger = logger
      yield config if block_given?
    end
  end

  Object.const_set(:KarafkaApp, karafka_app)
  self.karafka_application = karafka_app
  evaluate_routing(consumer_group_prefix: consumer_group_prefix) if consumer_registry.present? && draw_routing
end

.inject_routing!(registry) ⇒ Object



105
106
107
# File 'lib/dionysus.rb', line 105

def self.inject_routing!(registry)
  @consumer_registry = registry
end

.karafka_applicationObject



47
48
49
# File 'lib/dionysus.rb', line 47

def self.karafka_application
  @karafka_application
end

.karafka_application=(karafka_app) ⇒ Object



43
44
45
# File 'lib/dionysus.rb', line 43

def self.karafka_application=(karafka_app)
  @karafka_application = karafka_app
end

.loaderObject



17
18
19
20
21
# File 'lib/dionysus.rb', line 17

def self.loader
  @loader ||= Zeitwerk::Loader.for_gem.tap do |loader|
    loader.ignore("#{__dir__}/dionysus-rb.rb")
  end
end

.loggerObject



93
94
95
96
97
98
99
# File 'lib/dionysus.rb', line 93

def self.logger
  if karafka_application
    karafka_application.config.logger
  else
    Logger.new($stdout)
  end
end

.monitorObject



109
110
111
# File 'lib/dionysus.rb', line 109

def self.monitor
  @monitor ||= Dionysus::Monitor.new
end

.outbox_worker_health_checkObject



83
84
85
# File 'lib/dionysus.rb', line 83

def self.outbox_worker_health_check
  @outbox_worker_health_check ||= Dionysus::Producer::Outbox::HealthCheck.new
end