Module: Racecar

Defined in:
lib/racecar/erroneous_state_error.rb,
lib/racecar.rb,
lib/racecar/cli.rb,
lib/racecar/ctl.rb,
lib/racecar/pause.rb,
lib/racecar/config.rb,
lib/racecar/daemon.rb,
lib/racecar/heroku.rb,
lib/racecar/runner.rb,
lib/racecar/datadog.rb,
lib/racecar/message.rb,
lib/racecar/version.rb,
lib/racecar/consumer.rb,
lib/racecar/producer.rb,
lib/racecar/consumer_set.rb,
lib/racecar/instrumenter.rb,
lib/racecar/liveness_probe.rb,
lib/racecar/parallel_runner.rb,
lib/racecar/delivery_callback.rb,
lib/racecar/null_instrumenter.rb,
lib/racecar/rebalance_listener.rb,
lib/racecar/message_delivery_error.rb,
lib/racecar/rails_config_file_loader.rb,
lib/generators/racecar/install_generator.rb,
lib/generators/racecar/consumer_generator.rb

Overview

‘rd_kafka_offsets_store()` (et.al) returns an error for any partition that is not currently assigned (through `rd_kafka_*assign()`). This prevents a race condition where an application would store offsets after the assigned partitions had been revoked (which resets the stored offset), that could cause these old stored offsets to be committed later when the same partitions were assigned to this consumer again - effectively overwriting any committed offsets by any consumers that were assigned the same partitions previously. This would typically result in the offsets rewinding and messages to be reprocessed. As an extra effort to avoid this situation the stored offset is now also reset when partitions are assigned (through `rd_kafka_*assign()`).

Defined Under Namespace

Modules: Datadog, Generators, Heroku, RailsConfigFileLoader Classes: Cli, Config, ConfigError, Consumer, ConsumerSet, Ctl, Daemon, DeliveryCallback, ErroneousStateError, Error, Instrumenter, LivenessProbe, Message, MessageDeliveryError, NullInstrumenter, ParallelRunner, Pause, Producer, RebalanceListener, Runner

Constant Summary collapse

VERSION =
"2.11.0"

Class Method Summary collapse

Class Method Details

.configObject



23
24
25
# File 'lib/racecar.rb', line 23

def self.config
  @config ||= Config.new
end

.config=(config) ⇒ Object



27
28
29
# File 'lib/racecar.rb', line 27

def self.config=(config)
  @config = config
end

.configure {|config| ... } ⇒ Object

Yields:



31
32
33
# File 'lib/racecar.rb', line 31

def self.configure
  yield config
end

.instrumenterObject



64
65
66
# File 'lib/racecar.rb', line 64

def self.instrumenter
  config.instrumenter
end

.loggerObject



35
36
37
# File 'lib/racecar.rb', line 35

def self.logger
  config.logger
end

.logger=(logger) ⇒ Object



39
40
41
# File 'lib/racecar.rb', line 39

def self.logger=(logger)
  config.logger = logger
end

.produce_async(value:, topic:, **options) ⇒ Object



43
44
45
# File 'lib/racecar.rb', line 43

def self.produce_async(value:, topic:, **options)
  producer.produce_async(value: value, topic: topic, **options)
end

.produce_sync(value:, topic:, **options) ⇒ Object



47
48
49
# File 'lib/racecar.rb', line 47

def self.produce_sync(value:, topic:, **options)
  producer.produce_sync(value: value, topic: topic, **options)
end

.producerObject



55
56
57
58
59
60
61
62
# File 'lib/racecar.rb', line 55

def self.producer
  Thread.current[:racecar_producer] ||= begin
    if config.datadog_enabled
      require "racecar/datadog"
    end
    Racecar::Producer.new(config: config, logger: logger, instrumenter: instrumenter)
  end
end

.run(processor) ⇒ Object



68
69
70
# File 'lib/racecar.rb', line 68

def self.run(processor)
  runner(processor).run
end

.runner(processor) ⇒ Object



72
73
74
75
76
77
78
79
80
# File 'lib/racecar.rb', line 72

def self.runner(processor)
  runner = Runner.new(processor, config: config, logger: logger, instrumenter: config.instrumenter)

  if config.parallel_workers && config.parallel_workers > 1
    ParallelRunner.new(runner: runner, config: config, logger: logger)
  else
    runner
  end
end

.wait_for_delivery(&block) ⇒ Object



51
52
53
# File 'lib/racecar.rb', line 51

def self.wait_for_delivery(&block)
  producer.wait_for_delivery(&block)
end