Class: Karafka::Pro::Processing::Coordinator
- Inherits:
-
Karafka::Processing::Coordinator
- Object
- Karafka::Processing::Coordinator
- Karafka::Pro::Processing::Coordinator
- Defined in:
- lib/karafka/pro/processing/coordinator.rb
Overview
Pro coordinator that provides extra orchestration methods useful for parallel processing within the same partition
Instance Attribute Summary collapse
-
#filter ⇒ Object
readonly
Returns the value of attribute filter.
-
#virtual_offset_manager ⇒ Object
readonly
Returns the value of attribute virtual_offset_manager.
Attributes inherited from Karafka::Processing::Coordinator
#partition, #pause_tracker, #seek_offset, #topic
Instance Method Summary collapse
-
#collapsed? ⇒ Boolean
Are we in a collapsed state at the moment.
-
#failure!(consumer, error) ⇒ Object
Sets the consumer failure status and additionally starts the collapse until.
-
#filtered? ⇒ Boolean
Did any of the filters apply any logic that would cause use to run the filtering flow.
-
#finished? ⇒ Boolean
Is the coordinated work finished or not.
-
#initialize(*args) ⇒ Coordinator
constructor
A new instance of Coordinator.
-
#on_enqueued ⇒ Object
Runs synchronized code once for a collective of virtual partitions prior to work being enqueued.
-
#on_finished ⇒ Object
Runs once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution.
-
#on_revoked ⇒ Object
Runs once after a partition is revoked.
-
#on_started ⇒ Object
Runs given code only once per all the coordinated jobs upon starting first of them.
-
#start(messages) ⇒ Object
Starts the coordination process.
Methods inherited from Karafka::Processing::Coordinator
#decrement, #failure?, #increment, #manual_pause, #manual_pause?, #manual_seek, #manual_seek?, #marked?, #revoke, #revoked?, #success!, #success?, #synchronize
Constructor Details
#initialize(*args) ⇒ Coordinator
Returns a new instance of Coordinator.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 23 def initialize(*args) super @executed = [] @flow_lock = Mutex.new @collapser = Collapser.new @filter = FiltersApplier.new(self) return unless topic.virtual_partitions? @virtual_offset_manager = VirtualOffsetManager.new( topic.name, partition ) # We register our own "internal" filter to support filtering of messages that were marked # as consumed virtually @filter.filters << Filters::VirtualLimiter.new( @virtual_offset_manager, @collapser ) end |
Instance Attribute Details
#filter ⇒ Object (readonly)
Returns the value of attribute filter.
20 21 22 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 20 def filter @filter end |
#virtual_offset_manager ⇒ Object (readonly)
Returns the value of attribute virtual_offset_manager.
20 21 22 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 20 def virtual_offset_manager @virtual_offset_manager end |
Instance Method Details
#collapsed? ⇒ Boolean
Returns are we in a collapsed state at the moment.
75 76 77 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 75 def collapsed? @collapser.collapsed? end |
#failure!(consumer, error) ⇒ Object
Sets the consumer failure status and additionally starts the collapse until
69 70 71 72 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 69 def failure!(consumer, error) super @collapser.collapse_until!(@last_message.offset + 1) end |
#filtered? ⇒ Boolean
Returns did any of the filters apply any logic that would cause use to run the filtering flow.
81 82 83 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 81 def filtered? @filter.applied? end |
#finished? ⇒ Boolean
Returns is the coordinated work finished or not.
86 87 88 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 86 def finished? @running_jobs.zero? end |
#on_enqueued ⇒ Object
Runs synchronized code once for a collective of virtual partitions prior to work being enqueued
92 93 94 95 96 97 98 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 92 def on_enqueued @flow_lock.synchronize do return unless executable?(:on_enqueued) yield(@last_message) end end |
#on_finished ⇒ Object
Runs once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution
112 113 114 115 116 117 118 119 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 112 def on_finished @flow_lock.synchronize do return unless finished? return unless executable?(:on_finished) yield(@last_message) end end |
#on_revoked ⇒ Object
Runs once after a partition is revoked
122 123 124 125 126 127 128 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 122 def on_revoked @flow_lock.synchronize do return unless executable?(:on_revoked) yield(@last_message) end end |
#on_started ⇒ Object
Runs given code only once per all the coordinated jobs upon starting first of them
101 102 103 104 105 106 107 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 101 def on_started @flow_lock.synchronize do return unless executable?(:on_started) yield(@last_message) end end |
#start(messages) ⇒ Object
Starts the coordination process
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 49 def start() super @collapser.refresh!(.first.offset) @filter.apply!() @executed.clear # We keep the old processed offsets until the collapsing is done and regular processing # with virtualization is restored @virtual_offset_manager.clear if topic.virtual_partitions? && !@collapser.collapsed? @last_message = .last end |