Class: Boatload::AsyncBatchProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/boatload/async_batch_processor.rb

Overview

A class for asynchronously enqueueing work to be processed in large batches.

Instance Method Summary collapse

Constructor Details

#initialize(delivery_interval: 0, max_backlog_size: 0, max_queue_size: 1000, logger: Logger.new(STDOUT), context: nil) {|items, logger, context| ... } ⇒ AsyncBatchProcessor

Initializes a new AsyncBatchProcessor.

Parameters:

  • delivery_interval (Integer) (defaults to: 0)

    if greater than zero, the number of seconds between automatic batch processes.

  • max_backlog_size (Integer) (defaults to: 0)

    if greater than zero, the number of backlog items that will automatically trigger a batch process.

  • max_queue_size (Integer) (defaults to: 1000)

    the maximum number of messages in the Queue before a QueueOverflow will be raised.

  • logger (Logger) (defaults to: Logger.new(STDOUT))

    a Logger that will be passed to the process Proc.

  • context (Object) (defaults to: nil)

    additional context that will be passed to the process Proc.

  • &block (Proc)

    the code that processes items in the backlog.

Yields:

  • (items, logger, context)

    Passes the backlog, a logger, and some context to the process Proc.

Raises:

  • (ArgumentError)


24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/boatload/async_batch_processor.rb', line 24

def initialize(
  delivery_interval: 0,
  max_backlog_size: 0,
  max_queue_size: 1000,
  logger: Logger.new(STDOUT),
  context: nil,
  &block
)
  raise ArgumentError, 'delivery_interval must not be negative' if delivery_interval.negative?
  raise ArgumentError, 'max_backlog_size must not be negative' if max_backlog_size.negative?
  raise ArgumentError, 'You must give a block' unless block_given?

  @queue = Queue.new
  @logger = logger
  @max_queue_size = max_queue_size

  @worker = Worker.new(
    queue: @queue,
    max_backlog_size: max_backlog_size,
    logger: @logger,
    context: context,
    &block
  )

  @timer = Timer.new(
    queue: @queue,
    delivery_interval: delivery_interval,
    logger: @logger
  )

  @thread_mutex = Mutex.new
  @worker_thread = @timer_thread = nil
end

Instance Method Details

#processnil

Asynchronously processes the items in the backlog. This method will return immediately and the actual work will be done in the background.

Returns:

  • (nil)


81
82
83
84
85
86
# File 'lib/boatload/async_batch_processor.rb', line 81

def process
  ensure_threads_running!

  @queue.push [:process, nil]
  nil
end

#push(*items) ⇒ nil

Adds an item to the backlog.

Parameters:

  • items (*Object)

    the item to add to the backlog.

Returns:

  • (nil)

Raises:



63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/boatload/async_batch_processor.rb', line 63

def push(*items)
  ensure_threads_running!

  items.each do |item|
    if @queue.size >= @max_queue_size
      raise QueueOverflow, "Max queue size (#{@max_queue_size} messages) reached"
    end

    @queue.push([:item, item])
  end

  nil
end

#shutdownnil

Processes any items in the backlog, shuts down the background worker, and stops the timer. This method will block until the items in the backlog have been processed.

Returns:

  • (nil)


93
94
95
96
97
98
99
100
101
# File 'lib/boatload/async_batch_processor.rb', line 93

def shutdown
  ensure_threads_running!

  @queue.push [:shutdown, nil]
  @timer_thread&.exit
  @timer_thread&.join
  @worker_thread&.join
  nil
end