Class: BatchQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/batch_queue/version.rb,
lib/batch_queue/batch_queue.rb

Constant Summary collapse

VERSION =
"1.1.0"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_batch_size: nil, max_interval_seconds: nil, &block) ⇒ BatchQueue

starts the queue either max_batch_size or interval_milliseconds or both must be set



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/batch_queue/batch_queue.rb', line 7

def initialize(max_batch_size: nil, max_interval_seconds: nil, &block)
  if max_batch_size.nil? && max_interval_seconds.nil?
    raise 'either max_batch_size or max_interval_seconds or both must be set'
  end
  @is_running = true
  @queue = Queue.new
  @block = block
  @max_batch_size = max_batch_size
  @max_interval_seconds = max_interval_seconds
  @mutex = Mutex.new
  @cond_var = ConditionVariable.new
  @runner = Thread.new { run }
  @on_error_callback = nil

  at_exit do
    stop
  end
end

Instance Attribute Details

#max_batch_sizeObject (readonly)

Returns the value of attribute max_batch_size.



2
3
4
# File 'lib/batch_queue/batch_queue.rb', line 2

def max_batch_size
  @max_batch_size
end

#max_interval_secondsObject (readonly)

Returns the value of attribute max_interval_seconds.



3
4
5
# File 'lib/batch_queue/batch_queue.rb', line 3

def max_interval_seconds
  @max_interval_seconds
end

Instance Method Details

#on_error(&block) ⇒ Object



51
52
53
# File 'lib/batch_queue/batch_queue.rb', line 51

def on_error(&block)
  @on_error_callback = block
end

#push(object) ⇒ Object Also known as: <<



26
27
28
29
30
31
32
33
# File 'lib/batch_queue/batch_queue.rb', line 26

def push(object)
  @mutex.synchronize do
    raise 'BatchQueue is stopped' unless @is_running
    @queue.push(object)
    @cond_var.signal
  end
  object
end

#sizeObject



36
37
38
39
40
# File 'lib/batch_queue/batch_queue.rb', line 36

def size
  @mutex.synchronize do
    @queue.size
  end
end

#stopObject

stops the queue and signals to flush remaining queue, blocking until done.



43
44
45
46
47
48
49
# File 'lib/batch_queue/batch_queue.rb', line 43

def stop
  @mutex.synchronize do
    @is_running = false
    @cond_var.signal
  end
  @runner.join
end