Class: Karafka::Processing::JobsQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/processing/jobs_queue.rb

Overview

This is the key work component for Karafka jobs distribution. It provides API for running jobs in parallel while operating within more than one subscription group.

We need to take into consideration fact, that more than one subscription group can operate on this queue, that’s why internally we keep track of processing per group.

We work with the assumption, that partitions data is evenly distributed.

Instance Method Summary collapse

Constructor Details

#initializeKarafka::Processing::JobsQueue



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/karafka/processing/jobs_queue.rb', line 14

def initialize
  @queue = Queue.new
  # Those queues will act as semaphores internally. Since we need an indicator for waiting
  # we could use Thread.pass but this is expensive. Instead we can just lock until any
  # of the workers finishes their work and we can re-check. This means that in the worse
  # scenario, we will context switch 10 times per poll instead of getting this thread
  # scheduled by Ruby hundreds of thousands of times per group.
  # We cannot use a single semaphore as it could potentially block in listeners that should
  # process with their data and also could unlock when a given group needs to remain locked
  @semaphores = Concurrent::Map.new do |h, k|
    h.compute_if_absent(k) { Queue.new }
  end

  @in_processing = Hash.new { |h, k| h[k] = [] }

  @mutex = Mutex.new
end

Instance Method Details

#<<(job) ⇒ Object

Adds the job to the internal main queue, scheduling it for execution in a worker and marks this job as in processing pipeline.

Parameters:



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/karafka/processing/jobs_queue.rb', line 43

def <<(job)
  # We do not push the job if the queue is closed as it means that it would anyhow not be
  # executed
  return if @queue.closed?

  @mutex.synchronize do
    group = @in_processing[job.group_id]

    raise(Errors::JobsQueueSynchronizationError, job.group_id) if group.include?(job)

    group << job

    @queue << job
  end
end

#clear(group_id) ⇒ Object

Clears the processing states for a provided group. Useful when a recovery happens and we need to clean up state but only for a given subscription group.

Parameters:

  • group_id (String)


89
90
91
92
93
94
95
# File 'lib/karafka/processing/jobs_queue.rb', line 89

def clear(group_id)
  @mutex.synchronize do
    @in_processing[group_id].clear
    # We unlock it just in case it was blocked when clearing started
    tick(group_id)
  end
end

#closeObject

Stops the whole processing queue.



98
99
100
101
102
103
104
105
# File 'lib/karafka/processing/jobs_queue.rb', line 98

def close
  @mutex.synchronize do
    return if @queue.closed?

    @queue.close
    @semaphores.each_value(&:close)
  end
end

#complete(job) ⇒ Object

Marks a given job from a given group as completed. When there are no more jobs from a given group to be executed, we won’t wait.

Parameters:



78
79
80
81
82
83
# File 'lib/karafka/processing/jobs_queue.rb', line 78

def complete(job)
  @mutex.synchronize do
    @in_processing[job.group_id].delete(job)
    tick(job.group_id)
  end
end

#empty?(group_id) ⇒ Boolean

a given group.

Parameters:

  • group_id (String)

Returns:

  • (Boolean)

    tell us if we have anything in the processing (or for processing) from



111
112
113
114
115
# File 'lib/karafka/processing/jobs_queue.rb', line 111

def empty?(group_id)
  @mutex.synchronize do
    @in_processing[group_id].empty?
  end
end

#popJobs::Base?

Note:

This command is blocking and will wait until any job is available on the main queue

Returns waits for a job from the main queue and returns it once available or returns nil if the queue has been stopped and there won’t be anything more to process ever.

Returns:

  • (Jobs::Base, nil)

    waits for a job from the main queue and returns it once available or returns nil if the queue has been stopped and there won’t be anything more to process ever.



63
64
65
# File 'lib/karafka/processing/jobs_queue.rb', line 63

def pop
  @queue.pop
end

#sizeInteger

Note:

Using ‘#pop` won’t decrease this number as only marking job as completed does this

Returns number of jobs that are either enqueued or in processing (but not finished)

Returns:

  • (Integer)

    number of elements in the queue



35
36
37
# File 'lib/karafka/processing/jobs_queue.rb', line 35

def size
  @in_processing.values.map(&:size).sum
end

#statisticsHash

  • ‘busy` - number of jobs that are currently being processed (active work)

  • ‘enqueued` - number of jobs in the queue that are waiting to be picked up by a worker

Returns:

  • (Hash)

    hash with basic usage statistics of this queue.



132
133
134
135
136
137
# File 'lib/karafka/processing/jobs_queue.rb', line 132

def statistics
  {
    busy: size - @queue.size,
    enqueued: @queue.size
  }.freeze
end

#tick(group_id) ⇒ Object

Note:

This does not release the wait lock. It just causes a conditions recheck

Causes the wait lock to re-check the lock conditions and potential unlock.

Parameters:

  • group_id (String)

    id of the group we want to unlock for one tick



70
71
72
# File 'lib/karafka/processing/jobs_queue.rb', line 70

def tick(group_id)
  @semaphores[group_id] << true
end

#wait(group_id) ⇒ Object

Note:

This method is blocking.

Blocks when there are things in the queue in a given group and waits until all the blocking

jobs from a given group are completed

Parameters:

  • group_id (String)

    id of the group in which jobs we’re interested.



122
123
124
125
126
# File 'lib/karafka/processing/jobs_queue.rb', line 122

def wait(group_id)
  # Go doing other things while we cannot process and wait for anyone to finish their work
  # and re-check the wait status
  @semaphores[group_id].pop while wait?(group_id)
end