Class: Karafka::Processing::Coordinator

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/processing/coordinator.rb

Overview

Note:

This coordinator needs to be thread safe. Some operations are performed only in the listener thread, but we go with thread-safe by default for all not to worry about potential future mistakes.

Basic coordinator that allows us to provide coordination objects into consumers.

This is a wrapping layer to simplify management of work to be handled around consumption.

Direct Known Subclasses

Karafka::Pro::Processing::Coordinator

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, partition, pause_tracker) ⇒ Coordinator

Returns a new instance of Coordinator.

Parameters:



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/karafka/processing/coordinator.rb', line 18

def initialize(topic, partition, pause_tracker)
  @topic = topic
  @partition = partition
  @pause_tracker = pause_tracker
  @revoked = false
  @consumptions = {}
  @running_jobs = 0
  @manual_pause = false
  @manual_seek = false
  @mutex = Mutex.new
  @marked = false
  @failure = false
end

Instance Attribute Details

#partitionObject (readonly)

Returns the value of attribute partition.



13
14
15
# File 'lib/karafka/processing/coordinator.rb', line 13

def partition
  @partition
end

#pause_trackerObject (readonly)

Returns the value of attribute pause_tracker.



13
14
15
# File 'lib/karafka/processing/coordinator.rb', line 13

def pause_tracker
  @pause_tracker
end

#seek_offsetObject

Returns the value of attribute seek_offset.



13
14
15
# File 'lib/karafka/processing/coordinator.rb', line 13

def seek_offset
  @seek_offset
end

#topicObject (readonly)

Returns the value of attribute topic.



13
14
15
# File 'lib/karafka/processing/coordinator.rb', line 13

def topic
  @topic
end

Instance Method Details

#decrementObject

Decrements number of jobs we handle at the moment



74
75
76
77
78
79
80
81
82
83
84
# File 'lib/karafka/processing/coordinator.rb', line 74

def decrement
  synchronize do
    @running_jobs -= 1

    return @running_jobs unless @running_jobs.negative?

    # This should never happen. If it does, something is heavily out of sync. Please reach
    # out to us if you encounter this
    raise Karafka::Errors::InvalidCoordinatorStateError, 'Was zero before decrementation'
  end
end

#failure!(consumer, error) ⇒ Object

Mark given consumption on consumer as failed

Parameters:

  • consumer (Karafka::BaseConsumer)

    consumer that failed

  • error (StandardError)

    error that occurred



106
107
108
109
110
111
# File 'lib/karafka/processing/coordinator.rb', line 106

def failure!(consumer, error)
  synchronize do
    @failure = true
    consumption(consumer).failure!(error)
  end
end

#failure?Boolean

Returns true if any of work we were running failed.

Returns:

  • (Boolean)

    true if any of work we were running failed



114
115
116
# File 'lib/karafka/processing/coordinator.rb', line 114

def failure?
  @failure
end

#incrementObject

Increases number of jobs that we handle with this coordinator



69
70
71
# File 'lib/karafka/processing/coordinator.rb', line 69

def increment
  synchronize { @running_jobs += 1 }
end

#manual_pauseObject

Store in the coordinator info, that this pause was done manually by the end user and not by the system itself



146
147
148
# File 'lib/karafka/processing/coordinator.rb', line 146

def manual_pause
  @manual_pause = true
end

#manual_pause?Boolean

Returns are we in a pause that was initiated by the user.

Returns:

  • (Boolean)

    are we in a pause that was initiated by the user



151
152
153
# File 'lib/karafka/processing/coordinator.rb', line 151

def manual_pause?
  @pause_tracker.paused? && @manual_pause
end

#manual_seekObject

Marks seek as manual for coordination purposes



156
157
158
# File 'lib/karafka/processing/coordinator.rb', line 156

def manual_seek
  @manual_seek = true
end

#manual_seek?Boolean

Returns did a user invoke seek in the current operations scope.

Returns:

  • (Boolean)

    did a user invoke seek in the current operations scope



161
162
163
# File 'lib/karafka/processing/coordinator.rb', line 161

def manual_seek?
  @manual_seek
end

#marked?Boolean

Returns was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.

Returns:

  • (Boolean)

    was the new seek offset assigned at least once. This is needed because by default we assign seek offset of a first message ever, however this is insufficient for DLQ in a scenario where the first message would be broken. We would never move out of it and would end up in an endless loop.



140
141
142
# File 'lib/karafka/processing/coordinator.rb', line 140

def marked?
  @marked
end

#revokeObject

Marks given coordinator for processing group as revoked

This is invoked in two places:

- from the main listener loop when we detect revoked partitions
- from the consumer in case checkpointing fails

This means, we can end up having consumer being aware that it was revoked prior to the listener loop dispatching the revocation job. It is ok, as effectively nothing will be processed until revocation jobs are done.



127
128
129
# File 'lib/karafka/processing/coordinator.rb', line 127

def revoke
  synchronize { @revoked = true }
end

#revoked?Boolean

Returns is the partition we are processing revoked or not.

Returns:

  • (Boolean)

    is the partition we are processing revoked or not



132
133
134
# File 'lib/karafka/processing/coordinator.rb', line 132

def revoked?
  @revoked
end

#start(messages) ⇒ Object

Starts the coordinator for given consumption jobs

Parameters:

  • messages (Array<Karafka::Messages::Message>)

    batch of message for which we are going to coordinate work. Not used with regular coordinator.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/karafka/processing/coordinator.rb', line 35

def start(messages)
  @failure = false
  @running_jobs = 0
  # We need to clear the consumption results hash here, otherwise we could end up storing
  # consumption results of consumer instances we no longer control
  @consumptions.clear

  # When starting to run, no pause is expected and no manual pause as well
  @manual_pause = false

  # No user invoked seeks on a new run
  @manual_seek = false

  # We set it on the first encounter and never again, because then the offset setting
  # should be up to the consumers logic (our or the end user)
  # Seek offset needs to be always initialized as for case where manual offset management
  # is turned on, we need to have reference to the first offset even in case of running
  # multiple batches without marking any messages as consumed. Rollback needs to happen to
  # the last place we know of or the last message + 1 that was marked
  #
  # It is however worth keeping in mind, that this may need to be used with `#marked?` to
  # make sure that the first offset is an offset that has been marked.
  @seek_offset ||= messages.first.offset
end

#success!(consumer) ⇒ Object

Mark given consumption on consumer as successful

Parameters:



97
98
99
100
101
# File 'lib/karafka/processing/coordinator.rb', line 97

def success!(consumer)
  synchronize do
    consumption(consumer).success!
  end
end

#success?Boolean

Is all the consumption done and finished successfully for this coordinator We do not say we’re successful until all work is done, because running work may still crash.

Returns:

  • (Boolean)


89
90
91
92
93
# File 'lib/karafka/processing/coordinator.rb', line 89

def success?
  synchronize do
    @running_jobs.zero? && @consumptions.values.all?(&:success?)
  end
end

#synchronize(&block) ⇒ Object

Allows to run synchronized (locked) code that can operate in between virtual partitions

Parameters:

  • block (Proc)

    code we want to run in the synchronized mode



167
168
169
# File 'lib/karafka/processing/coordinator.rb', line 167

def synchronize(&block)
  @mutex.synchronize(&block)
end