Class: Karafka::Pro::Processing::Coordinator
- Inherits:
-
Karafka::Processing::Coordinator
- Object
- Karafka::Processing::Coordinator
- Karafka::Pro::Processing::Coordinator
- Defined in:
- lib/karafka/pro/processing/coordinator.rb
Overview
Pro coordinator that provides extra orchestration methods useful for parallel processing within the same partition
Instance Attribute Summary
Attributes inherited from Karafka::Processing::Coordinator
Instance Method Summary collapse
-
#finished? ⇒ Boolean
Is the coordinated work finished or not.
-
#initialize(*args) ⇒ Coordinator
constructor
A new instance of Coordinator.
-
#on_enqueued ⇒ Object
Runs synchronized code once for a collective of virtual partitions prior to work being enqueued.
-
#on_finished ⇒ Object
Runs once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution.
-
#on_revoked ⇒ Object
Runs once after a partition is revoked.
-
#on_started ⇒ Object
Runs given code only once per all the coordinated jobs upon starting first of them.
-
#start(messages) ⇒ Object
Starts the coordination process.
Methods inherited from Karafka::Processing::Coordinator
#consumption, #decrement, #increment, #manual_pause, #manual_pause?, #revoke, #revoked?, #success?
Constructor Details
#initialize(*args) ⇒ Coordinator
Returns a new instance of Coordinator.
21 22 23 24 25 26 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 21 def initialize(*args) super @executed = [] @flow_lock = Mutex.new end |
Instance Method Details
#finished? ⇒ Boolean
Returns is the coordinated work finished or not.
41 42 43 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 41 def finished? @running_jobs.zero? end |
#on_enqueued ⇒ Object
Runs synchronized code once for a collective of virtual partitions prior to work being enqueued
47 48 49 50 51 52 53 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 47 def on_enqueued @flow_lock.synchronize do return unless executable?(:on_enqueued) yield(@last_message) end end |
#on_finished ⇒ Object
Runs once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution
67 68 69 70 71 72 73 74 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 67 def on_finished @flow_lock.synchronize do return unless finished? return unless executable?(:on_finished) yield(@last_message) end end |
#on_revoked ⇒ Object
Runs once after a partition is revoked
77 78 79 80 81 82 83 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 77 def on_revoked @flow_lock.synchronize do return unless executable?(:on_revoked) yield(@last_message) end end |
#on_started ⇒ Object
Runs given code only once per all the coordinated jobs upon starting first of them
56 57 58 59 60 61 62 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 56 def on_started @flow_lock.synchronize do return unless executable?(:on_started) yield(@last_message) end end |
#start(messages) ⇒ Object
Starts the coordination process
31 32 33 34 35 36 37 38 |
# File 'lib/karafka/pro/processing/coordinator.rb', line 31 def start() super @mutex.synchronize do @executed.clear @last_message = .last end end |