Class: Karafka::Processing::Executor

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/karafka/processing/executor.rb

Overview

Note:

Executors are not removed after partition is revoked. They are not that big and will be re-used in case of a re-claim

Note:

Since given consumer can run various operations, executor manages that and its lifecycle. There are following types of operations with appropriate before/after, etc:

  • consume - primary operation related to running user consumption code

  • idle - cleanup job that runs on idle runs where no messages would be passed to the end user. This is used for complex flows with filters, etc

  • revoked - runs after the partition was revoked

  • shutdown - runs when process is going to shutdown

Executors:

  • run consumers code (for ‘#call`) or run given preparation / teardown operations when needed from separate threads.

  • they re-create consumer instances in case of partitions that were revoked and assigned back.

Direct Known Subclasses

Karafka::Pro::Processing::Executor

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(group_id, client, coordinator) ⇒ Executor

Returns a new instance of Executor.

Parameters:



47
48
49
50
51
52
# File 'lib/karafka/processing/executor.rb', line 47

def initialize(group_id, client, coordinator)
  @id = SecureRandom.hex(6)
  @group_id = group_id
  @client = client
  @coordinator = coordinator
end

Instance Attribute Details

#coordinatorKarafka::Processing::Coordinator (readonly)

Returns coordinator for this executor.

Returns:



42
43
44
# File 'lib/karafka/processing/executor.rb', line 42

def coordinator
  @coordinator
end

#group_idString (readonly)

Returns subscription group id to which a given executor belongs.

Returns:

  • (String)

    subscription group id to which a given executor belongs



36
37
38
# File 'lib/karafka/processing/executor.rb', line 36

def group_id
  @group_id
end

#idString (readonly)

Returns unique id that we use to ensure, that we use for state tracking.

Returns:

  • (String)

    unique id that we use to ensure, that we use for state tracking



33
34
35
# File 'lib/karafka/processing/executor.rb', line 33

def id
  @id
end

#messagesKarafka::Messages::Messages (readonly)

Returns messages batch.

Returns:



39
40
41
# File 'lib/karafka/processing/executor.rb', line 39

def messages
  @messages
end

Instance Method Details

#after_consumeObject

Runs consumer after consumption code



98
99
100
# File 'lib/karafka/processing/executor.rb', line 98

def after_consume
  consumer.on_after_consume
end

#before_consumeObject

Runs setup and warm-up code in the worker prior to running the consumption



80
81
82
# File 'lib/karafka/processing/executor.rb', line 80

def before_consume
  consumer.on_before_consume
end

#before_schedule_consume(messages) ⇒ Object

Allows us to prepare the consumer in the listener thread prior to the job being send to be scheduled. It also allows to run some code that is time sensitive and cannot wait in the queue as it could cause starvation.

Parameters:



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/karafka/processing/executor.rb', line 59

def before_schedule_consume(messages)
  # Recreate consumer with each batch if persistence is not enabled
  # We reload the consumers with each batch instead of relying on some external signals
  # when needed for consistency. That way devs may have it on or off and not in this
  # middle state, where re-creation of a consumer instance would occur only sometimes
  @consumer = nil unless topic.consumer_persistence

  # First we build messages batch...
  consumer.messages = Messages::Builders::Messages.call(
    messages,
    topic,
    partition,
    # the moment we've received the batch or actually the moment we've enqueued it,
    # but good enough
    Time.now
  )

  consumer.on_before_schedule_consume
end

#before_schedule_eofedObject

Runs the code needed before eofed work is scheduled



115
116
117
# File 'lib/karafka/processing/executor.rb', line 115

def before_schedule_eofed
  consumer.on_before_schedule_eofed
end

#before_schedule_idleObject

Runs the code needed before idle work is scheduled



103
104
105
# File 'lib/karafka/processing/executor.rb', line 103

def before_schedule_idle
  consumer.on_before_schedule_idle
end

#before_schedule_revokedObject

Runs code needed before revoked job is scheduled



127
128
129
# File 'lib/karafka/processing/executor.rb', line 127

def before_schedule_revoked
  consumer.on_before_schedule_revoked if @consumer
end

#before_schedule_shutdownObject

Runs code needed before shutdown job is scheduled



148
149
150
# File 'lib/karafka/processing/executor.rb', line 148

def before_schedule_shutdown
  consumer.on_before_schedule_shutdown if @consumer
end

#consumeObject

Runs consumer data processing against given batch and handles failures and errors.



92
93
94
95
# File 'lib/karafka/processing/executor.rb', line 92

def consume
  # We run the consumer client logic...
  consumer.on_consume
end

#eofedObject

Runs consumed eofed operation. This may run even when there were no messages received prior. This will however not run when eof is received together with messages as in such case ‘#consume` will run



122
123
124
# File 'lib/karafka/processing/executor.rb', line 122

def eofed
  consumer.on_eofed
end

#idleObject

Runs consumer idle operations This may include house-keeping or other state management changes that can occur but that not mean there are any new messages available for the end user to process



110
111
112
# File 'lib/karafka/processing/executor.rb', line 110

def idle
  consumer.on_idle
end

#revokedObject

Note:

Clearing the consumer will ensure, that if we get the partition back, it will be handled with a consumer with a clean state.

Note:

We run it only when consumer was present, because presence indicates, that at least a single message has been consumed.

Note:

We do not reset the consumer but we indicate need for recreation instead, because after the revocation, there still may be ‘#after_consume` running that needs a given consumer instance.

Runs the controller ‘#revoked` method that should be triggered when a given consumer is no longer needed due to partitions reassignment.



143
144
145
# File 'lib/karafka/processing/executor.rb', line 143

def revoked
  consumer.on_revoked if @consumer
end

#shutdownObject

Note:

While we do not need to clear the consumer here, it’s a good habit to clean after work is done.

Runs the controller ‘#shutdown` method that should be triggered when a given consumer is no longer needed as we’re closing the process.



157
158
159
160
161
# File 'lib/karafka/processing/executor.rb', line 157

def shutdown
  # There is a case, where the consumer no longer exists because it was revoked, in case like
  # that we do not build a new instance and shutdown should not be triggered.
  consumer.on_shutdown if @consumer
end

#wrap(action, &block) ⇒ Object

Runs the wrap/around execution context appropriate for a given action

Parameters:

  • action (Symbol)

    action execution wrapped with our block

  • block (Proc)

    execution context



87
88
89
# File 'lib/karafka/processing/executor.rb', line 87

def wrap(action, &block)
  consumer.on_wrap(action, &block)
end