Class: Karafka::Processing::Coordinator

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/processing/coordinator.rb

Overview

Note:

This coordinator needs to be thread safe. Some operations are performed only in the listener thread, but we go with thread-safe by default for all not to worry about potential future mistakes.

Basic coordinator that allows us to provide coordination objects into consumers.

This is a wrapping layer to simplify management of work to be handled around consumption.

Direct Known Subclasses

Karafka::Pro::Processing::Coordinator

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pause_tracker) ⇒ Coordinator

Returns a new instance of Coordinator.

Parameters:



19
20
21
22
23
24
25
26
# File 'lib/karafka/processing/coordinator.rb', line 19

def initialize(pause_tracker)
  @pause_tracker = pause_tracker
  @revoked = false
  @consumptions = {}
  @running_jobs = 0
  @manual_pause = false
  @mutex = Mutex.new
end

Instance Attribute Details

#pause_trackerKarafka::TimeTrackers::Pause (readonly)



14
15
16
# File 'lib/karafka/processing/coordinator.rb', line 14

def pause_tracker
  @pause_tracker
end

#seek_offsetObject

Returns the value of attribute seek_offset.



16
17
18
# File 'lib/karafka/processing/coordinator.rb', line 16

def seek_offset
  @seek_offset
end

Instance Method Details

#consumption(consumer) ⇒ Karafka::Processing::Result

Returns result object which we can use to indicate consumption processing state.

Parameters:

  • consumer (Object)

    karafka consumer (normal or pro)

Returns:



77
78
79
80
81
# File 'lib/karafka/processing/coordinator.rb', line 77

def consumption(consumer)
  @mutex.synchronize do
    @consumptions[consumer] ||= Processing::Result.new
  end
end

#decrementObject

Decrements number of jobs we handle at the moment



62
63
64
65
66
67
68
69
70
71
72
# File 'lib/karafka/processing/coordinator.rb', line 62

def decrement
  @mutex.synchronize do
    @running_jobs -= 1

    return @running_jobs unless @running_jobs.negative?

    # This should never happen. If it does, something is heavily out of sync. Please reach
    # out to us if you encounter this
    raise Karafka::Errors::InvalidCoordinatorStateError, 'Was zero before decrementation'
  end
end

#incrementObject

Increases number of jobs that we handle with this coordinator



57
58
59
# File 'lib/karafka/processing/coordinator.rb', line 57

def increment
  @mutex.synchronize { @running_jobs += 1 }
end

#manual_pauseObject

Store in the coordinator info, that this pause was done manually by the end user and not by the system itself



108
109
110
# File 'lib/karafka/processing/coordinator.rb', line 108

def manual_pause
  @mutex.synchronize { @manual_pause = true }
end

#manual_pause?Boolean

Returns are we in a pause that was initiated by the user.

Returns:

  • (Boolean)

    are we in a pause that was initiated by the user



113
114
115
# File 'lib/karafka/processing/coordinator.rb', line 113

def manual_pause?
  @pause_tracker.paused? && @manual_pause
end

#revokeObject

Marks given coordinator for processing group as revoked

This is invoked in two places:

- from the main listener loop when we detect revoked partitions
- from the consumer in case checkpointing fails

This means, we can end up having consumer being aware that it was revoked prior to the listener loop dispatching the revocation job. It is ok, as effectively nothing will be processed until revocation jobs are done.



97
98
99
# File 'lib/karafka/processing/coordinator.rb', line 97

def revoke
  @mutex.synchronize { @revoked = true }
end

#revoked?Boolean

Returns is the partition we are processing revoked or not.

Returns:

  • (Boolean)

    is the partition we are processing revoked or not



102
103
104
# File 'lib/karafka/processing/coordinator.rb', line 102

def revoked?
  @revoked
end

#start(messages) ⇒ Object

Starts the coordinator for given consumption jobs

Parameters:

  • messages (Array<Karafka::Messages::Message>)

    batch of message for which we are going to coordinate work. Not used with regular coordinator.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/karafka/processing/coordinator.rb', line 31

def start(messages)
  @mutex.synchronize do
    @running_jobs = 0
    # We need to clear the consumption results hash here, otherwise we could end up storing
    # consumption results of consumer instances we no longer control
    @consumptions.clear

    # When starting to run, no pause is expected and no manual pause as well
    @manual_pause = false

    # We set it on the first encounter and never again, because then the offset setting
    # should be up to the consumers logic (our or the end user)
    # Seek offset needs to be always initialized as for case where manual offset management
    # is turned on, we need to have reference to the first offset even in case of running
    # multiple batches without marking any messages as consumed. Rollback needs to happen to
    # the last place we know of or the last message + 1 that was marked
    @seek_offset ||= messages.first.offset
  end
end

#success?Boolean

Is all the consumption done and finished successfully for this coordinator

Returns:

  • (Boolean)


84
85
86
# File 'lib/karafka/processing/coordinator.rb', line 84

def success?
  @mutex.synchronize { @running_jobs.zero? && @consumptions.values.all?(&:success?) }
end