Class: Karafka::Pro::Processing::Coordinator

Inherits:
Karafka::Processing::Coordinator show all
Defined in:
lib/karafka/pro/processing/coordinator.rb

Overview

Pro coordinator that provides extra orchestration methods useful for parallel processing within the same partition

Instance Attribute Summary

Attributes inherited from Karafka::Processing::Coordinator

#pause_tracker, #seek_offset

Instance Method Summary collapse

Methods inherited from Karafka::Processing::Coordinator

#consumption, #decrement, #increment, #manual_pause, #manual_pause?, #revoke, #revoked?, #success?

Constructor Details

#initialize(*args) ⇒ Coordinator

Returns a new instance of Coordinator.

Parameters:

  • args (Object)

    anything the base coordinator accepts



21
22
23
24
25
26
# File 'lib/karafka/pro/processing/coordinator.rb', line 21

def initialize(*args)
  super

  @executed = []
  @flow_lock = Mutex.new
end

Instance Method Details

#finished?Boolean

Returns is the coordinated work finished or not.

Returns:

  • (Boolean)

    is the coordinated work finished or not



41
42
43
# File 'lib/karafka/pro/processing/coordinator.rb', line 41

def finished?
  @running_jobs.zero?
end

#on_enqueuedObject

Runs synchronized code once for a collective of virtual partitions prior to work being enqueued



47
48
49
50
51
52
53
# File 'lib/karafka/pro/processing/coordinator.rb', line 47

def on_enqueued
  @flow_lock.synchronize do
    return unless executable?(:on_enqueued)

    yield(@last_message)
  end
end

#on_finishedObject

Runs once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution



67
68
69
70
71
72
73
74
# File 'lib/karafka/pro/processing/coordinator.rb', line 67

def on_finished
  @flow_lock.synchronize do
    return unless finished?
    return unless executable?(:on_finished)

    yield(@last_message)
  end
end

#on_revokedObject

Runs once after a partition is revoked



77
78
79
80
81
82
83
# File 'lib/karafka/pro/processing/coordinator.rb', line 77

def on_revoked
  @flow_lock.synchronize do
    return unless executable?(:on_revoked)

    yield(@last_message)
  end
end

#on_startedObject

Runs given code only once per all the coordinated jobs upon starting first of them



56
57
58
59
60
61
62
# File 'lib/karafka/pro/processing/coordinator.rb', line 56

def on_started
  @flow_lock.synchronize do
    return unless executable?(:on_started)

    yield(@last_message)
  end
end

#start(messages) ⇒ Object

Starts the coordination process

Parameters:



31
32
33
34
35
36
37
38
# File 'lib/karafka/pro/processing/coordinator.rb', line 31

def start(messages)
  super

  @mutex.synchronize do
    @executed.clear
    @last_message = messages.last
  end
end