Class: Karafka::Processing::TimedQueue

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/processing/timed_queue.rb

Overview

Minimal queue with timeout for Ruby 3.1 and lower.

It is needed because only since 3.2, Ruby has a timeout on ‘#pop`

Instance Method Summary collapse

Constructor Details

#initializeTimedQueue

Returns a new instance of TimedQueue.



11
12
13
14
15
# File 'lib/karafka/processing/timed_queue.rb', line 11

def initialize
  @queue = Queue.new
  @mutex = Thread::Mutex.new
  @resource = Thread::ConditionVariable.new
end

Instance Method Details

#closeObject

Closes the internal queue and releases the lock



54
55
56
57
58
59
# File 'lib/karafka/processing/timed_queue.rb', line 54

def close
  @mutex.synchronize do
    @queue.close
    @resource.broadcast
  end
end

#pop(timeout: 10_000_000_000) ⇒ Object

Note:

We use timeout in seconds because this is how Ruby 3.2+ works and we want to have the same API for newer and older Ruby versions

No timeout means waiting up to 31 years

Parameters:

  • timeout (Integer) (defaults to: 10_000_000_000)

    max number of seconds to wait on the pop

Returns:

  • (Object)

    element inserted on the array or ‘nil` on timeout



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/karafka/processing/timed_queue.rb', line 36

def pop(timeout: 10_000_000_000)
  deadline = monotonic_now + timeout * 1000

  @mutex.synchronize do
    loop do
      return @queue.pop unless @queue.empty?
      return @queue.pop if @queue.closed?

      to_wait = (deadline - monotonic_now) / 1_000.0

      return nil if to_wait <= 0

      @resource.wait(@mutex, to_wait)
    end
  end
end

#push(obj) ⇒ Object Also known as: <<

Adds element to the queue

Parameters:

  • obj (Object)

    pushes an element onto the queue



20
21
22
23
24
25
# File 'lib/karafka/processing/timed_queue.rb', line 20

def push(obj)
  @mutex.synchronize do
    @queue << obj
    @resource.broadcast
  end
end