Class: Karafka::Processing::Jobs::Base

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/karafka/processing/jobs/base.rb

Overview

Base class for all the jobs types that are suppose to run in workers threads. Each job can have 3 main entry-points: ‘#before_call`, `#call` and `#after_call` Only `#call` is required.

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBase

Creates a new job instance



24
25
26
27
28
29
# File 'lib/karafka/processing/jobs/base.rb', line 24

def initialize
  # All jobs are blocking by default and they can release the lock when blocking operations
  # are done (if needed)
  @non_blocking = false
  @status = :pending
end

Class Attribute Details

.actionSymbol

Returns Job matching appropriate action.

Returns:

  • (Symbol)

    Job matching appropriate action



20
21
22
# File 'lib/karafka/processing/jobs/base.rb', line 20

def action
  @action
end

Instance Attribute Details

#executorObject (readonly)

Returns the value of attribute executor.



16
17
18
# File 'lib/karafka/processing/jobs/base.rb', line 16

def executor
  @executor
end

Instance Method Details

#after_callObject

When redefined can run any code that should run after executing the proper code



54
# File 'lib/karafka/processing/jobs/base.rb', line 54

def after_call; end

#before_callObject

When redefined can run any code that should run before executing the proper code



46
# File 'lib/karafka/processing/jobs/base.rb', line 46

def before_call; end

#before_scheduleObject

Note:

This will run in the listener thread and not in the worker

When redefined can run any code prior to the job being scheduled

Raises:

  • (NotImplementedError)


41
42
43
# File 'lib/karafka/processing/jobs/base.rb', line 41

def before_schedule
  raise NotImplementedError, 'Please implement in a subclass'
end

#callObject

The main entry-point of a job

Raises:

  • (NotImplementedError)


49
50
51
# File 'lib/karafka/processing/jobs/base.rb', line 49

def call
  raise NotImplementedError, 'Please implement in a subclass'
end

#finish!Object

Note:

Since the scheduler knows exactly when it schedules jobs and when it keeps them pending, we do not need advanced state tracking and the only information from the “outside” is whether it was finished or not after it was scheduled for execution.

Marks the job as finished. Used by the worker to indicate, that this job is done.



79
80
81
# File 'lib/karafka/processing/jobs/base.rb', line 79

def finish!
  @status = :finished
end

#finished?Boolean

Returns was this job finished.

Returns:

  • (Boolean)

    was this job finished.



70
71
72
# File 'lib/karafka/processing/jobs/base.rb', line 70

def finished?
  @status == :finished
end

#non_blocking?Boolean

Note:

Blocking job is a job, that will cause the job queue to wait until it is finished before removing the lock on new jobs being added

Note:

All the jobs are blocking by default

Note:

Job needs to mark itself as non-blocking only after it is done with all the blocking things (pausing partition, etc).

Returns is this a non-blocking job.

Returns:

  • (Boolean)

    is this a non-blocking job



65
66
67
# File 'lib/karafka/processing/jobs/base.rb', line 65

def non_blocking?
  @non_blocking
end

#wrap(&block) ⇒ Object

Note:

We inject the action name so user can decide whether to run custom logic on a given action or not.

Runs the wrap/around job hook within which the rest of the flow happens

Parameters:

  • block (Proc)

    whole user related processing flow



35
36
37
# File 'lib/karafka/processing/jobs/base.rb', line 35

def wrap(&block)
  executor.wrap(self.class.action, &block)
end