Module: NulogyMessageBusConsumer

Defined in:
lib/nulogy_message_bus_consumer.rb,
lib/nulogy_message_bus_consumer/clock.rb,
lib/nulogy_message_bus_consumer/config.rb,
lib/nulogy_message_bus_consumer/engine.rb,
lib/nulogy_message_bus_consumer/message.rb,
lib/nulogy_message_bus_consumer/version.rb,
lib/nulogy_message_bus_consumer/pipeline.rb,
lib/nulogy_message_bus_consumer/kafka_utils.rb,
lib/nulogy_message_bus_consumer/lag_tracker.rb,
lib/nulogy_message_bus_consumer/null_logger.rb,
lib/nulogy_message_bus_consumer/deployment/ecs.rb,
lib/nulogy_message_bus_consumer/steps/timed_task.rb,
lib/nulogy_message_bus_consumer/processed_message.rb,
lib/nulogy_message_bus_consumer/steps/log_messages.rb,
lib/nulogy_message_bus_consumer/steps/stream_messages.rb,
lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb,
lib/nulogy_message_bus_consumer/steps/commit_on_success.rb,
lib/nulogy_message_bus_consumer/steps/deduplicate_messages.rb,
lib/nulogy_message_bus_consumer/steps/connect_to_message_bus.rb,
lib/nulogy_message_bus_consumer/tasks/supervise_consumer_lag.rb,
lib/nulogy_message_bus_consumer/steps/seek_beginning_of_topic.rb,
lib/nulogy_message_bus_consumer/tasks/prune_processed_messages.rb,
lib/nulogy_message_bus_consumer/handlers/log_unprocessed_messages.rb,
lib/nulogy_message_bus_consumer/steps/stream_messages_until_none_are_left.rb

Defined Under Namespace

Modules: Deployment, Handlers, KafkaUtils, Steps, Tasks Classes: Clock, Config, Engine, LagTracker, Message, NullLogger, Pipeline, ProcessedMessage

Constant Summary collapse

VERSION =
"3.0.0"

Class Method Summary collapse

Class Method Details

.configure(options = {}) {|config| ... } ⇒ Object

Yields:

  • (config)


34
35
36
37
38
# File 'lib/nulogy_message_bus_consumer.rb', line 34

def configure(options = {})
  self.config ||= Config.new
  config.update(options) if options.present?
  yield(config) if block_given?
end

.consumer_audit_pipeline(config: self.config, logger: self.logger) ⇒ Object



75
76
77
78
79
80
81
82
# File 'lib/nulogy_message_bus_consumer.rb', line 75

def consumer_audit_pipeline(config: self.config, logger: self.logger)
  Pipeline.new([
    Steps::ConnectToMessageBus.new(config, logger),
    Steps::SeekBeginningOfTopic.new,
    Steps::StreamMessagesUntilNoneAreLeft.new(logger),
    Handlers::LogUnprocessedMessages.new(logger)
  ])
end

.invoke_pipeline(*steps) ⇒ Object



44
45
46
# File 'lib/nulogy_message_bus_consumer.rb', line 44

def invoke_pipeline(*steps)
  Pipeline.new(steps).invoke
end

.loggerObject



40
41
42
# File 'lib/nulogy_message_bus_consumer.rb', line 40

def logger
  @logger ||= NullLogger.new
end


48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/nulogy_message_bus_consumer.rb', line 48

def recommended_consumer_pipeline(config: self.config, logger: self.logger)
  Pipeline.new([
    # System processing/health steps.
    # Note: that since they are before `StreamMessages`, they will only
    # be called once, without any messages.
    Steps::ConnectToMessageBus.new(config, logger),
    Steps::TimedTask.new(
      Tasks::LogConsumerLag.new(logger, config.log_lag_interval_seconds, config.lag_timeout_milliseconds)
    ),
    Steps::TimedTask.new(
      Tasks::PruneProcessedMessages.new(logger, config.prune_interval_seconds, config.prune_max_age)
    ),
    Steps::TimedTask.new(
      Tasks::SuperviseConsumerLag.new(
        logger,
        check_interval_seconds: config.lag_check_interval_seconds,
        tracker: LagTracker.new(failing_checks: config.lag_checks)
      )
    ),
    Steps::StreamMessages.new(logger),
    # Message processing steps start here.
    Steps::LogMessages.new(logger),
    Steps::CommitOnSuccess.new(logger),
    Steps::DeduplicateMessages.new(logger)
  ])
end