queue-processor
Queue Processor is a calculation model built on top of Delayed Job designed to process a single root calculation followed by a number of dependent calculations.
Installation
Add queue-processor and state_machine to your Gemfile. Note that a specific version of state_machine is required until https://github.com/pluginaweek/state_machine/pull/255 is resolved.
gem 'queue-processor', :git => 'https://github.com/GoodMeasuresLLC/queue-processor.git'
gem 'state_machine', :git => 'https://github.com/GoodMeasuresLLC/state_machine.git'
Run bundle install
Requirements
You must have 3 models:
- The RootCalculation: This is the calculation that everything depends on. Performing a new RootCalculation will cause everything in progress for the old calculation to be aborted. For example, a Weekly Calculation that determine's the user's score for the week against their goals.
- A DependentCalculationGroup: These dependent calculations are queued and processed after the RootCalculation is performed. DependentCalculationGroups simply queue additional work, partitioned and prioritized. For example, a DailyCalculation could queue HourlyCalculations prioritized such that tomorrow's calculations are prioritized first. When all the HourlyCalculations complete, they are made available as a group though the DailyCalculation. When the last DailyCalculation completes, the RootCalculation is marked as finished.
- A DependentCalculation: A calculation that depends on the Root Calculation being performed. DependentCalculations are made available in groups and when all are finished, the RootCalculation is marked as complete. For example, an HourlyCalculation could be performed for a particular hour and day of the week. The would be grouped by day of the week and when all the hourly calculations for a day are complete, that week's dependent calculation group would be marked as complete. And when all the hours for the whole week are done, the Root Calculation would be marked as complete.
Your models implement methods for performing the calculations and queuing dependent work
Examples
class WeeklyCalculation < ActiveRecord::Base
include QueueProcessor::RootCalculation
root_calculation_config do
self.dependent_calculation_group_association = :daily_calculations
self.run_at = lambda {|run| Time.now + 0.005.seconds}
self.priority = lambda {|run| 0}
end
has_many :daily_calculations, :dependent => :destroy
# Required hook to perform the calculation associated with the Root Calculation
def perform_calculation
update(:computed_value => rand())
end
# Required hook to create and queue the dependent work
def create_and_enqueue_dependent_calculations
(0...7).map do |n|
self.daily_calculations.create!(:day => n)
end.each(&:add_to_queue)
end
class DailyCalculation < ActiveRecord::Base
include QueueProcessor::DependentCalculationGroup
dependent_calculation_group_config do
self.dependent_calculation_association = :hourly_calculations
self.parent_calculation = :weekly_calculation
self.priority = lambda {|daily_calculation| 1}
end
belongs_to :weekly_calculation
has_many :hourly_calculations, :dependent => :destroy
# Required hook to create and queue dependent work, partitioned to this group
def create_and_enqueue_dependent_calculations
(0...23).map do |n|
self.hourly_calculations.create!(:hour => n)
end.each(&:add_to_queue)
end
# Call back invoked when a calculation finished and any old calculations for this day should be cleaned up
def cleanup_old_calculations
begin
self.with_lock do
query = DailyCalculation.where(:weekly_calculation_id => self.parent_calculation.id, :day => self.day).where("id < ?", self.id)
query.destroy_all
end
rescue ActiveRecord::RecordNotFound => e
end
end
end
class HourlyCalculation < ActiveRecord::Base
include QueueProcessor::DependentCalculation
dependent_calculation_config do
self.parent_calculation = :daily_calculation
self.priority = lambda {|daily_calculation| 2}
end
belongs_to :daily_calculation
# Required hook that computes the hourly value
def perform_calculation
update(:computed_value => rand())
end
end