Class: Karafka::Pro::Processing::Coordinator
- Inherits:
-
Karafka::Processing::Coordinator
- Object
- Karafka::Processing::Coordinator
- Karafka::Pro::Processing::Coordinator
- Extended by:
- Forwardable
- Defined in:
- lib/karafka/pro/processing/coordinator.rb
Overview
Pro coordinator that provides extra orchestration methods useful for parallel processing within the same partition
Instance Attribute Summary collapse
-
#errors_tracker ⇒ Object
readonly
Returns the value of attribute errors_tracker.
-
#filter ⇒ Object
readonly
Returns the value of attribute filter.
-
#shared_mutex ⇒ Object
readonly
Returns the value of attribute shared_mutex.
-
#virtual_offset_manager ⇒ Object
readonly
Returns the value of attribute virtual_offset_manager.
Attributes inherited from Karafka::Processing::Coordinator
#eofed, #last_polled_at, #partition, #pause_tracker, #seek_offset, #topic
Instance Method Summary collapse
-
#active_within?(interval) ⇒ Boolean
Was this partition in activity within last ‘interval` milliseconds.
-
#failure!(consumer, error) ⇒ Object
Sets the consumer failure status and additionally starts the collapse until.
-
#filtered? ⇒ Boolean
Did any of the filters apply any logic that would cause use to run the filtering flow.
-
#finished? ⇒ Boolean
Is the coordinated work finished or not.
-
#initialize(*args) ⇒ Coordinator
constructor
A new instance of Coordinator.
-
#on_enqueued ⇒ Object
Runs synchronized code once for a collective of virtual partitions prior to work being enqueued.
-
#on_finished ⇒ Object
Runs given code once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution.
-
#on_revoked ⇒ Object
Runs once after a partition is revoked.
-
#on_started ⇒ Object
Runs given code only once per all the coordinated jobs upon starting first of them.
-
#start(messages) ⇒ Object
Starts the coordination process.
Methods inherited from Karafka::Processing::Coordinator
#consumption, #decrement, #eofed?, #failure?, #increment, #manual_pause, #manual_pause?, #manual_seek, #manual_seek?, #marked?, #revoke, #revoked?, #success!, #success?, #synchronize
Constructor Details
#initialize(*args) ⇒ Coordinator
Returns a new instance of Coordinator.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 27 def initialize(*args) super @executed = [] @errors_tracker = Coordinators::ErrorsTracker.new @flow_mutex = Mutex.new # Lock for user code synchronization # We do not want to mix coordinator lock with the user lock not to create cases where # user imposed lock would lock the internal operations of Karafka # This shared lock can be used by the end user as it is not used internally by the # framework and can be used for user-facing locking @shared_mutex = Mutex.new @collapser = Collapser.new @filter = Coordinators::FiltersApplier.new(self) return unless topic.virtual_partitions? @virtual_offset_manager = Coordinators::VirtualOffsetManager.new( topic.name, partition, topic.virtual_partitions. ) # We register our own "internal" filter to support filtering of messages that were marked # as consumed virtually @filter.filters << Filters::VirtualLimiter.new( @virtual_offset_manager, @collapser ) end |
Instance Attribute Details
#errors_tracker ⇒ Object (readonly)
Returns the value of attribute errors_tracker.
24 25 26 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 24 def errors_tracker @errors_tracker end |
#filter ⇒ Object (readonly)
Returns the value of attribute filter.
24 25 26 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 24 def filter @filter end |
#shared_mutex ⇒ Object (readonly)
Returns the value of attribute shared_mutex.
24 25 26 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 24 def shared_mutex @shared_mutex end |
#virtual_offset_manager ⇒ Object (readonly)
Returns the value of attribute virtual_offset_manager.
24 25 26 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 24 def virtual_offset_manager @virtual_offset_manager end |
Instance Method Details
#active_within?(interval) ⇒ Boolean
Will return true also if currently active
Returns was this partition in activity within last ‘interval` milliseconds.
150 151 152 153 154 155 156 157 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 150 def active_within?(interval) # its always active if there's any job related to this coordinator that is still # enqueued or running return true if @running_jobs.values.any?(:positive?) # Otherwise we check last time any job of this coordinator was active @changed_at + interval > monotonic_now end |
#failure!(consumer, error) ⇒ Object
Sets the consumer failure status and additionally starts the collapse until
89 90 91 92 93 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 89 def failure!(consumer, error) super @errors_tracker << error collapse_until!(@last_message.offset + 1) end |
#filtered? ⇒ Boolean
Returns did any of the filters apply any logic that would cause use to run the filtering flow.
97 98 99 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 97 def filtered? @filter.applied? end |
#finished? ⇒ Boolean
Used only in the consume operation context
Returns is the coordinated work finished or not.
103 104 105 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 103 def finished? @running_jobs[:consume].zero? end |
#on_enqueued ⇒ Object
Runs synchronized code once for a collective of virtual partitions prior to work being enqueued
109 110 111 112 113 114 115 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 109 def on_enqueued @flow_mutex.synchronize do return unless executable?(:on_enqueued) yield(@last_message) end end |
#on_finished ⇒ Object
Runs given code once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution
129 130 131 132 133 134 135 136 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 129 def on_finished @flow_mutex.synchronize do return unless finished? return unless executable?(:on_finished) yield(@last_message) end end |
#on_revoked ⇒ Object
Runs once after a partition is revoked
139 140 141 142 143 144 145 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 139 def on_revoked @flow_mutex.synchronize do return unless executable?(:on_revoked) yield(@last_message) end end |
#on_started ⇒ Object
Runs given code only once per all the coordinated jobs upon starting first of them
118 119 120 121 122 123 124 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 118 def on_started @flow_mutex.synchronize do return unless executable?(:on_started) yield(@last_message) end end |
#start(messages) ⇒ Object
Starts the coordination process
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 61 def start() super @collapser.refresh!(.first.offset) @filter.apply!() # Do not clear coordinator errors storage when we are retrying, so we can reference the # errors that have happened during recovery. This can be useful for implementing custom # flows. There can be more errors than one when running with virtual partitions so we # need to make sure we collect them all. Under collapse when we reference a given # consumer we should be able to get all the errors and not just first/last. # # @note We use zero as the attempt mark because we are not "yet" in the attempt 1 @errors_tracker.clear if attempt.zero? @executed.clear # We keep the old processed offsets until the collapsing is done and regular processing # with virtualization is restored @virtual_offset_manager.clear if topic.virtual_partitions? && !collapsed? @last_message = .last end |