Class: Karafka::ActiveJob::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/active_job/dispatcher.rb

Overview

Dispatcher that sends the ActiveJob job to a proper topic based on the queue name

Direct Known Subclasses

Pro::ActiveJob::Dispatcher

Instance Method Summary collapse

Instance Method Details

#dispatch(job) ⇒ Object

Parameters:

  • job (ActiveJob::Base)

    job



17
18
19
20
21
22
23
# File 'lib/karafka/active_job/dispatcher.rb', line 17

def dispatch(job)
  ::Karafka.producer.public_send(
    fetch_option(job, :dispatch_method, DEFAULTS),
    topic: job.queue_name,
    payload: ::ActiveSupport::JSON.encode(serialize_job(job))
  )
end

#dispatch_at(_job, _timestamp) ⇒ Object

Note:

Karafka Pro supports this feature

Raises info, that Karafka backend does not support scheduling jobs

Parameters:

  • _job (Object)

    job we cannot enqueue

  • _timestamp (Time)

    time when job should run

Raises:

  • (NotImplementedError)


55
56
57
58
59
60
# File 'lib/karafka/active_job/dispatcher.rb', line 55

def dispatch_at(_job, _timestamp)
  raise NotImplementedError, <<~ERROR_MESSAGE
    This queueing backend does not support scheduling jobs.
    Consider using Karafka Pro, which supports this via the Scheduled Messages feature.
  ERROR_MESSAGE
end

#dispatch_many(jobs) ⇒ Object

Bulk dispatches multiple jobs using the Rails 7.1+ API

Parameters:

  • jobs (Array<ActiveJob::Base>)

    jobs we want to dispatch



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/karafka/active_job/dispatcher.rb', line 27

def dispatch_many(jobs)
  # Group jobs by their desired dispatch method
  # It can be configured per job class, so we need to make sure we divide them
  dispatches = Hash.new { |hash, key| hash[key] = [] }

  jobs.each do |job|
    d_method = fetch_option(job, :dispatch_many_method, DEFAULTS)

    dispatches[d_method] << {
      topic: job.queue_name,
      payload: ::ActiveSupport::JSON.encode(serialize_job(job))
    }
  end

  dispatches.each do |type, messages|
    ::Karafka.producer.public_send(
      type,
      messages
    )
  end
end