Class: Boatload::AsyncBatchProcessor
- Inherits:
-
Object
- Object
- Boatload::AsyncBatchProcessor
- Defined in:
- lib/boatload/async_batch_processor.rb
Overview
A class for asynchronously enqueueing work to be processed in large batches.
Instance Method Summary collapse
-
#initialize(delivery_interval: 0, max_backlog_size: 0, max_queue_size: 1000, logger: Logger.new(STDOUT), context: nil) {|items, logger, context| ... } ⇒ AsyncBatchProcessor
constructor
Initializes a new AsyncBatchProcessor.
-
#process ⇒ nil
Asynchronously processes the items in the backlog.
-
#push(*items) ⇒ nil
Adds an item to the backlog.
-
#shutdown ⇒ nil
Processes any items in the backlog, shuts down the background worker, and stops the timer.
Constructor Details
#initialize(delivery_interval: 0, max_backlog_size: 0, max_queue_size: 1000, logger: Logger.new(STDOUT), context: nil) {|items, logger, context| ... } ⇒ AsyncBatchProcessor
Initializes a new AsyncBatchProcessor.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/boatload/async_batch_processor.rb', line 24 def initialize( delivery_interval: 0, max_backlog_size: 0, max_queue_size: 1000, logger: Logger.new(STDOUT), context: nil, &block ) raise ArgumentError, 'delivery_interval must not be negative' if delivery_interval.negative? raise ArgumentError, 'max_backlog_size must not be negative' if max_backlog_size.negative? raise ArgumentError, 'You must give a block' unless block_given? @queue = Queue.new @logger = logger @max_queue_size = max_queue_size @worker = Worker.new( queue: @queue, max_backlog_size: max_backlog_size, logger: @logger, context: context, &block ) @timer = Timer.new( queue: @queue, delivery_interval: delivery_interval, logger: @logger ) @thread_mutex = Mutex.new @worker_thread = @timer_thread = nil end |
Instance Method Details
#process ⇒ nil
Asynchronously processes the items in the backlog. This method will return immediately and the actual work will be done in the background.
81 82 83 84 85 86 |
# File 'lib/boatload/async_batch_processor.rb', line 81 def process ensure_threads_running! @queue.push [:process, nil] nil end |
#push(*items) ⇒ nil
Adds an item to the backlog.
63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/boatload/async_batch_processor.rb', line 63 def push(*items) ensure_threads_running! items.each do |item| if @queue.size >= @max_queue_size raise QueueOverflow, "Max queue size (#{@max_queue_size} messages) reached" end @queue.push([:item, item]) end nil end |
#shutdown ⇒ nil
Processes any items in the backlog, shuts down the background worker, and stops the timer. This method will block until the items in the backlog have been processed.
93 94 95 96 97 98 99 100 101 |
# File 'lib/boatload/async_batch_processor.rb', line 93 def shutdown ensure_threads_running! @queue.push [:shutdown, nil] @timer_thread&.exit @timer_thread&.join @worker_thread&.join nil end |