Class: Karafka::Pro::Processing::Scheduler
- Inherits:
-
Karafka::Processing::Scheduler
- Object
- Karafka::Processing::Scheduler
- Karafka::Pro::Processing::Scheduler
- Defined in:
- lib/karafka/pro/processing/scheduler.rb
Overview
Optimizes scheduler that takes into consideration of execution time needed to process messages from given topics partitions. It uses the non-preemptive LJF algorithm
This scheduler is designed to optimize execution times on jobs that perform IO operations as when taking IO into consideration, the can achieve optimized parallel processing.
This scheduler can also work with virtual partitions.
Aside from consumption jobs, other jobs do not run often, thus we can leave them with default FIFO scheduler from the default Karafka scheduler
Instance Method Summary collapse
-
#schedule_consumption(queue, jobs_array) ⇒ Object
Schedules jobs in the LJF order for consumption.
Instance Method Details
#schedule_consumption(queue, jobs_array) ⇒ Object
Schedules jobs in the LJF order for consumption
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/karafka/pro/processing/scheduler.rb', line 33 def schedule_consumption(queue, jobs_array) pt = PerformanceTracker.instance ordered = [] jobs_array.each do |job| = job. = .first cost = pt.processing_time_p95(.topic, .partition) * .size ordered << [job, cost] end ordered.sort_by!(&:last) ordered.reverse! ordered.map!(&:first) ordered.each do |job| queue << job end end |