Module: Karafka::Pro::Processing::Strategies::Vp::Default
- Includes:
- Default
- Included in:
- Aj::DlqMomVp, Aj::FtrLrjMomVp, Aj::LrjMomVp, Aj::MomVp, Dlq::FtrLrjMomVp, Dlq::FtrLrjVp, Dlq::FtrVp, Dlq::LrjVp, Dlq::Vp, Ftr::Vp, Lrj::FtrVp, Lrj::Vp, Mom::Vp
- Defined in:
- lib/karafka/pro/processing/strategies/vp/default.rb
Overview
Just Virtual Partitions enabled
Constant Summary collapse
- FEATURES =
Features for this strategy
%i[ virtual_partitions ].freeze
Instance Method Summary collapse
-
#collapsed? ⇒ Boolean
Is the virtual processing collapsed in the context of given consumer.
-
#failing? ⇒ Boolean
True if any of virtual partition we’re operating in the entangled mode has already failed and we know we are failing collectively.
- #mark_as_consumed(message) ⇒ Object
- #mark_as_consumed!(message) ⇒ Object
Methods included from Default
#handle_after_consume, #handle_before_consume, #handle_consume, #handle_revoked
Methods included from Karafka::Processing::Strategies::Default
#commit_offsets, #commit_offsets!, #handle_after_consume, #handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown
Methods included from Karafka::Processing::Strategies::Base
#handle_after_consume, #handle_before_consume, #handle_consume, #handle_idle, #handle_revoked, #handle_shutdown
Instance Method Details
#collapsed? ⇒ Boolean
Returns is the virtual processing collapsed in the context of given consumer.
71 72 73 |
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 71 def collapsed? coordinator.collapsed? end |
#failing? ⇒ Boolean
We’ve named it ‘#failing?` instead of `#failure?` because it aims to be used from within virtual partitions where we want to have notion of collective failing not just “local” to our processing. We “are” failing with other virtual partitions raising an error, but locally we are still processing.
Returns true if any of virtual partition we’re operating in the entangled mode has already failed and we know we are failing collectively. Useful for early stop to minimize number of things processed twice.
83 84 85 |
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 83 def failing? coordinator.failure? end |
#mark_as_consumed(message) ⇒ Object
This virtual offset management uses a regular default marking API underneath. We do not alter the “real” marking API, as VPs are just one of many cases we want to support and we do not want to impact them with collective offsets management
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 35 def mark_as_consumed() return super if collapsed? manager = coordinator.virtual_offset_manager coordinator.synchronize do manager.mark() # If this is last marking on a finished flow, we can use the original # last message and in order to do so, we need to mark all previous messages as # consumed as otherwise the computed offset could be different # We mark until our offset just in case of a DLQ flow or similar, where we do not # want to mark all but until the expected location manager.mark_until() if coordinator.finished? return revoked? unless manager.markable? end manager.markable? ? super(manager.markable) : revoked? end |
#mark_as_consumed!(message) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/karafka/pro/processing/strategies/vp/default.rb', line 56 def mark_as_consumed!() return super if collapsed? manager = coordinator.virtual_offset_manager coordinator.synchronize do manager.mark() manager.mark_until() if coordinator.finished? end manager.markable? ? super(manager.markable) : revoked? end |