Module: Racecar
- Defined in:
- lib/racecar/erroneous_state_error.rb,
lib/racecar.rb,
lib/racecar/cli.rb,
lib/racecar/ctl.rb,
lib/racecar/pause.rb,
lib/racecar/config.rb,
lib/racecar/daemon.rb,
lib/racecar/heroku.rb,
lib/racecar/runner.rb,
lib/racecar/datadog.rb,
lib/racecar/message.rb,
lib/racecar/version.rb,
lib/racecar/consumer.rb,
lib/racecar/producer.rb,
lib/racecar/consumer_set.rb,
lib/racecar/instrumenter.rb,
lib/racecar/liveness_probe.rb,
lib/racecar/parallel_runner.rb,
lib/racecar/delivery_callback.rb,
lib/racecar/null_instrumenter.rb,
lib/racecar/rebalance_listener.rb,
lib/racecar/message_delivery_error.rb,
lib/racecar/rails_config_file_loader.rb,
lib/generators/racecar/install_generator.rb,
lib/generators/racecar/consumer_generator.rb
Overview
‘rd_kafka_offsets_store()` (et.al) returns an error for any partition that is not currently assigned (through `rd_kafka_*assign()`). This prevents a race condition where an application would store offsets after the assigned partitions had been revoked (which resets the stored offset), that could cause these old stored offsets to be committed later when the same partitions were assigned to this consumer again - effectively overwriting any committed offsets by any consumers that were assigned the same partitions previously. This would typically result in the offsets rewinding and messages to be reprocessed. As an extra effort to avoid this situation the stored offset is now also reset when partitions are assigned (through `rd_kafka_*assign()`).
Defined Under Namespace
Modules: Datadog, Generators, Heroku, RailsConfigFileLoader
Classes: Cli, Config, ConfigError, Consumer, ConsumerSet, Ctl, Daemon, DeliveryCallback, ErroneousStateError, Error, Instrumenter, LivenessProbe, Message, MessageDeliveryError, NullInstrumenter, ParallelRunner, Pause, Producer, RebalanceListener, Runner
Constant Summary
collapse
- VERSION =
"2.11.0"
Class Method Summary
collapse
Class Method Details
.config ⇒ Object
23
24
25
|
# File 'lib/racecar.rb', line 23
def self.config
@config ||= Config.new
end
|
.config=(config) ⇒ Object
27
28
29
|
# File 'lib/racecar.rb', line 27
def self.config=(config)
@config = config
end
|
31
32
33
|
# File 'lib/racecar.rb', line 31
def self.configure
yield config
end
|
.instrumenter ⇒ Object
64
65
66
|
# File 'lib/racecar.rb', line 64
def self.instrumenter
config.instrumenter
end
|
.logger ⇒ Object
35
36
37
|
# File 'lib/racecar.rb', line 35
def self.logger
config.logger
end
|
.logger=(logger) ⇒ Object
39
40
41
|
# File 'lib/racecar.rb', line 39
def self.logger=(logger)
config.logger = logger
end
|
.produce_async(value:, topic:, **options) ⇒ Object
43
44
45
|
# File 'lib/racecar.rb', line 43
def self.produce_async(value:, topic:, **options)
producer.produce_async(value: value, topic: topic, **options)
end
|
.produce_sync(value:, topic:, **options) ⇒ Object
47
48
49
|
# File 'lib/racecar.rb', line 47
def self.produce_sync(value:, topic:, **options)
producer.produce_sync(value: value, topic: topic, **options)
end
|
.producer ⇒ Object
55
56
57
58
59
60
61
62
|
# File 'lib/racecar.rb', line 55
def self.producer
Thread.current[:racecar_producer] ||= begin
if config.datadog_enabled
require "racecar/datadog"
end
Racecar::Producer.new(config: config, logger: logger, instrumenter: instrumenter)
end
end
|
.run(processor) ⇒ Object
68
69
70
|
# File 'lib/racecar.rb', line 68
def self.run(processor)
runner(processor).run
end
|
.runner(processor) ⇒ Object
72
73
74
75
76
77
78
79
80
|
# File 'lib/racecar.rb', line 72
def self.runner(processor)
runner = Runner.new(processor, config: config, logger: logger, instrumenter: config.instrumenter)
if config.parallel_workers && config.parallel_workers > 1
ParallelRunner.new(runner: runner, config: config, logger: logger)
else
runner
end
end
|
.wait_for_delivery(&block) ⇒ Object
51
52
53
|
# File 'lib/racecar.rb', line 51
def self.wait_for_delivery(&block)
producer.wait_for_delivery(&block)
end
|