Class: Karafka::Pro::Processing::Coordinator

Inherits:
Karafka::Processing::Coordinator show all
Defined in:
lib/karafka/pro/processing/coordinator.rb

Overview

Pro coordinator that provides extra orchestration methods useful for parallel processing within the same partition

Instance Attribute Summary collapse

Attributes inherited from Karafka::Processing::Coordinator

#partition, #pause_tracker, #seek_offset, #topic

Instance Method Summary collapse

Methods inherited from Karafka::Processing::Coordinator

#decrement, #failure?, #increment, #manual_pause, #manual_pause?, #manual_seek, #manual_seek?, #marked?, #revoke, #revoked?, #success!, #success?, #synchronize

Constructor Details

#initialize(*args) ⇒ Coordinator

Returns a new instance of Coordinator.

Parameters:

  • args (Object)

    anything the base coordinator accepts



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/karafka/pro/processing/coordinator.rb', line 23

def initialize(*args)
  super

  @executed = []
  @flow_lock = Mutex.new
  @collapser = Collapser.new
  @filter = FiltersApplier.new(self)

  return unless topic.virtual_partitions?

  @virtual_offset_manager = VirtualOffsetManager.new(
    topic.name,
    partition
  )

  # We register our own "internal" filter to support filtering of messages that were marked
  # as consumed virtually
  @filter.filters << Filters::VirtualLimiter.new(
    @virtual_offset_manager,
    @collapser
  )
end

Instance Attribute Details

#filterObject (readonly)

Returns the value of attribute filter.



20
21
22
# File 'lib/karafka/pro/processing/coordinator.rb', line 20

def filter
  @filter
end

#virtual_offset_managerObject (readonly)

Returns the value of attribute virtual_offset_manager.



20
21
22
# File 'lib/karafka/pro/processing/coordinator.rb', line 20

def virtual_offset_manager
  @virtual_offset_manager
end

Instance Method Details

#collapsed?Boolean

Returns are we in a collapsed state at the moment.

Returns:

  • (Boolean)

    are we in a collapsed state at the moment



75
76
77
# File 'lib/karafka/pro/processing/coordinator.rb', line 75

def collapsed?
  @collapser.collapsed?
end

#failure!(consumer, error) ⇒ Object

Sets the consumer failure status and additionally starts the collapse until

Parameters:

  • consumer (Karafka::BaseConsumer)

    consumer that failed

  • error (StandardError)

    error from the failure



69
70
71
72
# File 'lib/karafka/pro/processing/coordinator.rb', line 69

def failure!(consumer, error)
  super
  @collapser.collapse_until!(@last_message.offset + 1)
end

#filtered?Boolean

Returns did any of the filters apply any logic that would cause use to run the filtering flow.

Returns:

  • (Boolean)

    did any of the filters apply any logic that would cause use to run the filtering flow



81
82
83
# File 'lib/karafka/pro/processing/coordinator.rb', line 81

def filtered?
  @filter.applied?
end

#finished?Boolean

Returns is the coordinated work finished or not.

Returns:

  • (Boolean)

    is the coordinated work finished or not



86
87
88
# File 'lib/karafka/pro/processing/coordinator.rb', line 86

def finished?
  @running_jobs.zero?
end

#on_enqueuedObject

Runs synchronized code once for a collective of virtual partitions prior to work being enqueued



92
93
94
95
96
97
98
# File 'lib/karafka/pro/processing/coordinator.rb', line 92

def on_enqueued
  @flow_lock.synchronize do
    return unless executable?(:on_enqueued)

    yield(@last_message)
  end
end

#on_finishedObject

Runs once when all the work that is suppose to be coordinated is finished It runs once per all the coordinated jobs and should be used to run any type of post jobs coordination processing execution



112
113
114
115
116
117
118
119
# File 'lib/karafka/pro/processing/coordinator.rb', line 112

def on_finished
  @flow_lock.synchronize do
    return unless finished?
    return unless executable?(:on_finished)

    yield(@last_message)
  end
end

#on_revokedObject

Runs once after a partition is revoked



122
123
124
125
126
127
128
# File 'lib/karafka/pro/processing/coordinator.rb', line 122

def on_revoked
  @flow_lock.synchronize do
    return unless executable?(:on_revoked)

    yield(@last_message)
  end
end

#on_startedObject

Runs given code only once per all the coordinated jobs upon starting first of them



101
102
103
104
105
106
107
# File 'lib/karafka/pro/processing/coordinator.rb', line 101

def on_started
  @flow_lock.synchronize do
    return unless executable?(:on_started)

    yield(@last_message)
  end
end

#start(messages) ⇒ Object

Starts the coordination process

Parameters:



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/karafka/pro/processing/coordinator.rb', line 49

def start(messages)
  super

  @collapser.refresh!(messages.first.offset)

  @filter.apply!(messages)

  @executed.clear

  # We keep the old processed offsets until the collapsing is done and regular processing
  # with virtualization is restored
  @virtual_offset_manager.clear if topic.virtual_partitions? && !@collapser.collapsed?

  @last_message = messages.last
end