Module: NulogyMessageBusConsumer
- Defined in:
- lib/nulogy_message_bus_consumer.rb,
lib/nulogy_message_bus_consumer/clock.rb,
lib/nulogy_message_bus_consumer/config.rb,
lib/nulogy_message_bus_consumer/engine.rb,
lib/nulogy_message_bus_consumer/message.rb,
lib/nulogy_message_bus_consumer/version.rb,
lib/nulogy_message_bus_consumer/pipeline.rb,
lib/nulogy_message_bus_consumer/kafka_utils.rb,
lib/nulogy_message_bus_consumer/lag_tracker.rb,
lib/nulogy_message_bus_consumer/null_logger.rb,
lib/nulogy_message_bus_consumer/deployment/ecs.rb,
lib/nulogy_message_bus_consumer/steps/timed_task.rb,
lib/nulogy_message_bus_consumer/processed_message.rb,
lib/nulogy_message_bus_consumer/steps/log_messages.rb,
lib/nulogy_message_bus_consumer/steps/stream_messages.rb,
lib/nulogy_message_bus_consumer/tasks/log_consumer_lag.rb,
lib/nulogy_message_bus_consumer/steps/commit_on_success.rb,
lib/nulogy_message_bus_consumer/steps/deduplicate_messages.rb,
lib/nulogy_message_bus_consumer/steps/connect_to_message_bus.rb,
lib/nulogy_message_bus_consumer/tasks/supervise_consumer_lag.rb,
lib/nulogy_message_bus_consumer/steps/seek_beginning_of_topic.rb,
lib/nulogy_message_bus_consumer/tasks/prune_processed_messages.rb,
lib/nulogy_message_bus_consumer/handlers/log_unprocessed_messages.rb,
lib/nulogy_message_bus_consumer/steps/stream_messages_until_none_are_left.rb
Defined Under Namespace
Modules: Deployment, Handlers, KafkaUtils, Steps, Tasks
Classes: Clock, Config, Engine, LagTracker, Message, NullLogger, Pipeline, ProcessedMessage
Constant Summary
collapse
- VERSION =
"2.0.1"
Class Method Summary
collapse
Class Method Details
34
35
36
37
38
|
# File 'lib/nulogy_message_bus_consumer.rb', line 34
def configure(options = {})
self.config ||= Config.new
config.update(options) if options.present?
yield(config) if block_given?
end
|
.consumer_audit_pipeline(config: self.config, logger: self.logger) ⇒ Object
.invoke_pipeline(*steps) ⇒ Object
44
45
46
|
# File 'lib/nulogy_message_bus_consumer.rb', line 44
def invoke_pipeline(*steps)
Pipeline.new(steps).invoke
end
|
.logger ⇒ Object
40
41
42
|
# File 'lib/nulogy_message_bus_consumer.rb', line 40
def logger
@logger ||= NullLogger.new
end
|
.recommended_consumer_pipeline(config: self.config, logger: self.logger) ⇒ Object
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/nulogy_message_bus_consumer.rb', line 48
def recommended_consumer_pipeline(config: self.config, logger: self.logger)
Pipeline.new([
Steps::ConnectToMessageBus.new(config, logger),
Steps::TimedTask.new(
Tasks::LogConsumerLag.new(logger, config.log_lag_interval_seconds, config.lag_timeout_milliseconds)
),
Steps::TimedTask.new(
Tasks::PruneProcessedMessages.new(logger, config.prune_interval_seconds, config.prune_max_age)
),
Steps::TimedTask.new(
Tasks::SuperviseConsumerLag.new(
logger,
check_interval_seconds: config.lag_check_interval_seconds,
tracker: LagTracker.new(failing_checks: config.lag_checks)
)
),
Steps::StreamMessages.new(logger),
Steps::LogMessages.new(logger),
Steps::CommitOnSuccess.new(logger),
Steps::DeduplicateMessages.new(logger)
])
end
|