Class: Karafka::Pro::Processing::JobsQueue

Inherits:
Karafka::Processing::JobsQueue show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/pro/processing/jobs_queue.rb

Overview

Enhanced processing queue that provides ability to build complex work-distribution schedulers dedicated to particular job types

Aside from the OSS queue capabilities it allows for jobless locking for advanced schedulers

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Karafka::Processing::JobsQueue

#<<, #close, #complete, #pop, #statistics, #tick

Constructor Details

#initializeKarafka::Pro::Processing::JobsQueue



32
33
34
35
36
37
38
39
40
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 32

def initialize
  super

  @in_waiting = Hash.new { |h, k| h[k] = [] }
  @locks = Hash.new { |h, k| h[k] = {} }
  @async_locking = false

  @statistics[:waiting] = 0
end

Instance Attribute Details

#in_processingObject

Returns the value of attribute in_processing.



24
25
26
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 24

def in_processing
  @in_processing
end

Instance Method Details

#clear(group_id) ⇒ Object

Clears the processing states for a provided group. Useful when a recovery happens and we need to clean up state but only for a given subscription group.

Parameters:

  • group_id (String)


132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 132

def clear(group_id)
  @mutex.synchronize do
    @in_processing[group_id].clear

    @statistics[:waiting] -= @in_waiting[group_id].size
    @in_waiting[group_id].clear
    @locks[group_id].clear
    @async_locking = false

    # We unlock it just in case it was blocked when clearing started
    tick(group_id)
  end
end

#empty?(group_id) ⇒ Boolean

a given group.

Parameters:

  • group_id (String)

Returns:

  • (Boolean)

    tell us if we have anything in the processing (or for processing) from



150
151
152
153
154
155
156
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 150

def empty?(group_id)
  @mutex.synchronize do
    @in_processing[group_id].empty? &&
      @in_waiting[group_id].empty? &&
      !locked_async?(group_id)
  end
end

#lock(job) ⇒ Object

Method that allows us to lock queue on a given subscription group without enqueuing the a job. This can be used when building complex schedulers that want to postpone enqueuing before certain conditions are met.

Parameters:

  • job (Jobs::Base)

    job used for locking



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 57

def lock(job)
  @mutex.synchronize do
    group = @in_waiting[job.group_id]

    # This should never happen. Same job should not be locked twice
    raise(Errors::JobsQueueSynchronizationError, job.group_id) if group.include?(job)

    @statistics[:waiting] += 1

    group << job
  end
end

#lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT) ⇒ Object

Note:

We do not raise ‘Errors::JobsQueueSynchronizationError` similar to `#lock` here because we want to have ability to prolong time limited locks

Allows for explicit locking of the queue of a given subscription group.

This can be used for cross-topic synchronization.

Parameters:

  • group_id (String)

    id of the group we want to lock

  • lock_id (Object)

    unique id we want to use to identify our lock

  • timeout (Integer) (defaults to: WAIT_TIMEOUT)

    number of ms how long this lock should be valid. Useful for auto-expiring locks used to delay further processing without explicit pausing on the consumer



98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 98

def lock_async(group_id, lock_id, timeout: WAIT_TIMEOUT)
  return if @queue.closed?

  @async_locking = true

  @mutex.synchronize do
    @locks[group_id][lock_id] = monotonic_now + timeout

    # We need to tick so our new time sensitive lock can reload time constraints on sleep
    tick(group_id)
  end
end

#register(group_id) ⇒ Object

Registers semaphore and a lock hash

Parameters:

  • group_id (String)


45
46
47
48
49
50
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 45

def register(group_id)
  super
  @mutex.synchronize do
    @locks[group_id]
  end
end

#unlock(job) ⇒ Object

Method for unlocking the given subscription group queue space that was locked with a given job that was not added to the queue but used via ‘#lock`.

Parameters:

  • job (Jobs::Base)

    job that locked the queue



74
75
76
77
78
79
80
81
82
83
84
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 74

def unlock(job)
  @mutex.synchronize do
    @statistics[:waiting] -= 1

    return if @in_waiting[job.group_id].delete(job)

    # This should never happen. It means there was a job being unlocked that was never
    # locked in the first place
    raise(Errors::JobsQueueSynchronizationError, job.group_id)
  end
end

#unlock_async(group_id, lock_id) ⇒ Object

Allows for explicit unlocking of locked queue of a group

Parameters:

  • group_id (String)

    id of the group we want to unlock

  • lock_id (Object)

    unique id we want to use to identify our lock



116
117
118
119
120
121
122
123
124
125
126
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 116

def unlock_async(group_id, lock_id)
  @mutex.synchronize do
    if @locks[group_id].delete(lock_id)
      tick(group_id)

      return
    end

    raise(Errors::JobsQueueSynchronizationError, [group_id, lock_id])
  end
end

#wait(group_id) ⇒ Object

Note:

Because checking that async locking is on happens on regular ticking, first lock on a group can take up to one tick. That is expected.

Note:

This implementation takes into consideration temporary async locks that can happen. Thanks to the fact that we use the minimum lock time as a timeout, we do not have to wait a whole ticking period to unlock async locks.

Blocks when there are things in the queue in a given group and waits until all the

blocking jobs from a given group are completed or any of the locks times out

Parameters:

  • group_id (String)

    id of the group in which jobs we’re interested.

See Also:

  • Karafka::Pro::Processing::JobsQueue.`Karafka`Karafka::Processing`Karafka::Processing::JobsQueue`


169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/karafka/pro/processing/jobs_queue.rb', line 169

def wait(group_id)
  return super unless @async_locking

  # We do not generalize this flow because this one is more expensive as it has to allocate
  # extra objects. That's why we only use it when locks are actually in use
  base_interval = tick_interval / 1_000.0

  while wait?(group_id)
    yield if block_given?

    now = monotonic_now

    wait_times = @locks[group_id].values.map! do |lock_time|
      # Convert ms to seconds, seconds are required by Ruby queue engine
      (lock_time - now) / 1_000
    end

    wait_times.delete_if(&:negative?)
    wait_times << base_interval

    @semaphores.fetch(group_id).pop(timeout: wait_times.min)
  end
end