Class: Racecar::Consumer
- Inherits:
-
Object
- Object
- Racecar::Consumer
- Defined in:
- lib/racecar/consumer.rb
Defined Under Namespace
Classes: Subscription
Class Attribute Summary collapse
-
.consumer ⇒ Object
Returns the value of attribute consumer.
-
.fetch_messages ⇒ Object
Returns the value of attribute fetch_messages.
-
.group_id ⇒ Object
Returns the value of attribute group_id.
-
.max_wait_time ⇒ Object
Returns the value of attribute max_wait_time.
-
.parallel_workers ⇒ Object
Returns the value of attribute parallel_workers.
-
.producer ⇒ Object
Returns the value of attribute producer.
Class Method Summary collapse
-
.on_partitions_assigned(rebalance_event) ⇒ Object
Rebalance hooks for subclasses to override.
- .on_partitions_revoked(rebalance_event) ⇒ Object
-
.subscribes_to(*topics, start_from_beginning: true, max_bytes_per_partition: 1048576, additional_config: {}) ⇒ nil
Adds one or more topic subscriptions.
- .subscriptions ⇒ Object
Instance Method Summary collapse
- #configure(producer:, consumer:, instrumenter: NullInstrumenter, config: Racecar.config) ⇒ Object
-
#deliver! ⇒ Object
Blocks until all messages produced so far have been successfully published.
- #teardown ⇒ Object
Class Attribute Details
.consumer ⇒ Object
Returns the value of attribute consumer.
12 13 14 |
# File 'lib/racecar/consumer.rb', line 12 def consumer @consumer end |
.fetch_messages ⇒ Object
Returns the value of attribute fetch_messages.
12 13 14 |
# File 'lib/racecar/consumer.rb', line 12 def @fetch_messages end |
.group_id ⇒ Object
Returns the value of attribute group_id.
11 12 13 |
# File 'lib/racecar/consumer.rb', line 11 def group_id @group_id end |
.max_wait_time ⇒ Object
Returns the value of attribute max_wait_time.
10 11 12 |
# File 'lib/racecar/consumer.rb', line 10 def max_wait_time @max_wait_time end |
.parallel_workers ⇒ Object
Returns the value of attribute parallel_workers.
12 13 14 |
# File 'lib/racecar/consumer.rb', line 12 def parallel_workers @parallel_workers end |
.producer ⇒ Object
Returns the value of attribute producer.
12 13 14 |
# File 'lib/racecar/consumer.rb', line 12 def producer @producer end |
Class Method Details
.on_partitions_assigned(rebalance_event) ⇒ Object
Rebalance hooks for subclasses to override
42 |
# File 'lib/racecar/consumer.rb', line 42 def on_partitions_assigned(rebalance_event); end |
.on_partitions_revoked(rebalance_event) ⇒ Object
43 |
# File 'lib/racecar/consumer.rb', line 43 def on_partitions_revoked(rebalance_event); end |
.subscribes_to(*topics, start_from_beginning: true, max_bytes_per_partition: 1048576, additional_config: {}) ⇒ nil
Adds one or more topic subscriptions.
Can be called multiple times in order to subscribe to more topics.
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/racecar/consumer.rb', line 30 def subscribes_to( *topics, start_from_beginning: true, max_bytes_per_partition: 1048576, additional_config: {} ) topics.each do |topic| subscriptions << Subscription.new(topic, start_from_beginning, max_bytes_per_partition, additional_config) end end |
.subscriptions ⇒ Object
14 15 16 |
# File 'lib/racecar/consumer.rb', line 14 def subscriptions @subscriptions ||= [] end |
Instance Method Details
#configure(producer:, consumer:, instrumenter: NullInstrumenter, config: Racecar.config) ⇒ Object
46 47 48 49 50 51 52 53 54 |
# File 'lib/racecar/consumer.rb', line 46 def configure(producer:, consumer:, instrumenter: NullInstrumenter, config: Racecar.config) @producer = producer @delivery_handles = [] @consumer = consumer @instrumenter = instrumenter @config = config end |
#deliver! ⇒ Object
Blocks until all messages produced so far have been successfully published. If message delivery finally fails, a Racecar::MessageDeliveryError is raised. The delivery failed for the reason in the exception. The error can be broker side (e.g. downtime, configuration issue) or specific to the message being sent. The caller must handle the latter cases or run into head of line blocking.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/racecar/consumer.rb', line 63 def deliver! @delivery_handles ||= [] if @delivery_handles.any? instrumentation_payload = { delivered_message_count: @delivery_handles.size } @instrumenter.instrument('deliver_messages', instrumentation_payload) do @delivery_handles.each do |handle| begin # rdkafka-ruby checks every wait_timeout seconds if the message was # successfully delivered, up to max_wait_timeout seconds before raising # Rdkafka::AbstractHandle::WaitTimeoutError. librdkafka will (re)try to # deliver all messages in the background, until "config.message_timeout" # (message.timeout.ms) is exceeded. Phrased differently, rdkafka-ruby's # WaitTimeoutError is just informative. # The raising can be avoided if max_wait_timeout below is greater than # config.message_timeout, but config is not available here (without # changing the interface). handle.wait(max_wait_timeout: 60, wait_timeout: 0.1) rescue Rdkafka::AbstractHandle::WaitTimeoutError => e partition = MessageDeliveryError.partition_from_delivery_handle(handle) # ideally we could use the logger passed to the Runner, but it is not # available here. The runner sets it for Rdkafka, though, so we can use # that instead. @config.logger.debug "Still trying to deliver message to (partition #{partition})... (will try up to Racecar.config.message_timeout)" retry rescue Rdkafka::RdkafkaError => e raise MessageDeliveryError.new(e, handle) end end end end @delivery_handles.clear end |
#teardown ⇒ Object
56 |
# File 'lib/racecar/consumer.rb', line 56 def teardown; end |