Class: Karafka::Processing::Jobs::Base

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/karafka/processing/jobs/base.rb

Overview

Base class for all the jobs types that are suppose to run in workers threads. Each job can have 3 main entry-points: ‘#before_call`, `#call` and `#after_call` Only `#call` is required.

Direct Known Subclasses

Consume, Idle, Revoked, Shutdown

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBase

Creates a new job instance



19
20
21
22
23
# File 'lib/karafka/processing/jobs/base.rb', line 19

def initialize
  # All jobs are blocking by default and they can release the lock when blocking operations
  # are done (if needed)
  @non_blocking = false
end

Instance Attribute Details

#executorObject (readonly)

Returns the value of attribute executor.



16
17
18
# File 'lib/karafka/processing/jobs/base.rb', line 16

def executor
  @executor
end

Instance Method Details

#after_callObject

When redefined can run any code that should run after executing the proper code



38
# File 'lib/karafka/processing/jobs/base.rb', line 38

def after_call; end

#before_callObject

When redefined can run any code that should run before executing the proper code



30
# File 'lib/karafka/processing/jobs/base.rb', line 30

def before_call; end

#before_enqueueObject

Note:

This will run in the listener thread and not in the worker

When redefined can run any code prior to the job being enqueued



27
# File 'lib/karafka/processing/jobs/base.rb', line 27

def before_enqueue; end

#callObject

The main entry-point of a job

Raises:

  • (NotImplementedError)


33
34
35
# File 'lib/karafka/processing/jobs/base.rb', line 33

def call
  raise NotImplementedError, 'Please implement in a subclass'
end

#non_blocking?Boolean

Note:

Blocking job is a job, that will cause the job queue to wait until it is finished before removing the lock on new jobs being added

Note:

All the jobs are blocking by default

Note:

Job needs to mark itself as non-blocking only after it is done with all the blocking things (pausing partition, etc).

Returns is this a non-blocking job.

Returns:

  • (Boolean)

    is this a non-blocking job



49
50
51
# File 'lib/karafka/processing/jobs/base.rb', line 49

def non_blocking?
  @non_blocking
end