Class: Karafka::Pro::Processing::Scheduler

Inherits:
Karafka::Processing::Scheduler show all
Defined in:
lib/karafka/pro/processing/scheduler.rb

Overview

Optimizes scheduler that takes into consideration of execution time needed to process messages from given topics partitions. It uses the non-preemptive LJF algorithm

This scheduler is designed to optimize execution times on jobs that perform IO operations as when taking IO into consideration, the can achieve optimized parallel processing.

This scheduler can also work with virtual partitions.

Aside from consumption jobs, other jobs do not run often, thus we can leave them with default FIFO scheduler from the default Karafka scheduler

Instance Method Summary collapse

Instance Method Details

#schedule_consumption(queue, jobs_array) ⇒ Object

Schedules jobs in the LJF order for consumption

Parameters:



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/karafka/pro/processing/scheduler.rb', line 33

def schedule_consumption(queue, jobs_array)
  perf_tracker = PerformanceTracker.instance

  ordered = []

  jobs_array.each do |job|
    ordered << [
      job,
      processing_cost(perf_tracker, job)
    ]
  end

  ordered.sort_by!(&:last)
  ordered.reverse!
  ordered.map!(&:first)

  ordered.each do |job|
    queue << job
  end
end