TaskBatcher
Some tasks, like database inserts, are much more efficient to process in a batch. However, we generally want our tasks to be processed "soon" even if there's only one task. The TaskBatcher gem groups tasks by a taskname parameter, and starts a timer when the first task comes in. After the batch timer expires, it processes all tasks that it received in that time. (The caller provides the block to process the tasks.)
Uses EventMachine under the hood. May be combined with Messenger for durability guarantees.
Tested under Ruby 1.9.3 and 2.0.0.
Released under the three-clause BSD open source license. http://opensource.org/licenses/BSD-3-Clause See the LICENSE file.
Usage
You can either use the TaskManager module to make procedural calls, or instantiate a BatchManager object. BatchManager objects have a cleaner API, if you have tasks which are all processed in the same scope.
Using a BatchManager
taskname = 'db-insert' # can be any valid hash key: string, symbol, etc.
duration = 15 # batch duration of 15 seconds
callback = lambda { |result| print "The return value was #{result}.\n" }
mgr = TaskBatcher::BatchManager.new(taskname, callback, duration) do |tasks|
# This batcher performs the operation of inserting rows into a DB.
# This is an example of how to aggregate processing of many tasks.
sql = "INSERT INTO pet_owners VALUES ("
tasks.each do |task|
# each +task+ is the params hash from a single call to #task
sql += "( #{task[:name]}, #{task[:pet]} ), "
end
sql += ")\n"
result = (execute that SQL) # +result+ will be available to a callback
end
mgr.task name: 'Alice', pet: 'moa'
mgr.task name: 'Bob', pet: 'cassowary'
# ... etc. ...
Using the TaskBatcher module
taskname = 'db-insert' # can be any valid hash key: string, symbol, etc.
TaskBatcher.set_batch_duration(taskname, 15)
callback = lambda {|retval| print "The return value was #{retval}\n"}
def db_insert(data_list)
sql = "INSERT INTO pet_owners VALUES ("
data_list.each do |data|
# each +data+ row is the params hash from a single call to #task
sql += "( #{data[:name]}, #{data[:pet]} ), "
end
sql += ")\n"
retval = (execute that SQL) # retval will be available to a callback
end
pet_owner_1 = {name: 'Alice', pet: 'moa'}
TaskBatcher.task(taskname, pet_owner_1, callback) do |tasks|
db_insert(tasks)
end
pet_owner_2 = {name: 'Bob', pet: 'cassowary'}
TaskBatcher.task(taskname, pet_owner_2, callback) do |tasks|
db_insert(tasks)
end
... etc. ...
Setting batch durations
TaskBatcher.default_batch_duration # returns 60, the initial default
mytask = 'task name 1'
TaskBatcher.set_batch_duration(mytask, 120) # 2 minutes
TaskBatcher.batch_duration(mytask) # returns 120
TaskBatcher.batch_duration('your task') # returns 60, the default
TaskBatcher.set_default_batch_duration(30)
TaskBatcher.batch_duration('another task') # returns 30
TaskBatcher.batch_duration('your task') # returns 30 -- default changed
TaskBatcher.batch_duration(mytask) # still returns 120
Notes
- Batches are grouped by +taskname+. ('db-insert' in the first example.)
- If no batch duration is given, the default batch duration is used. The default batch duration is initially 60 seconds, but clients can change the default.
- Batch parameters may be of any type, though hashes seem an obvious choice. The batched function block must accept a data-list, where a single data-item constitutes the parameters of a single call within the batch.
- The batched function block can return any data type. If a callback is provided, it must accept the data type returned by the block. A callback value of nil indicates that the return value may be discarded.
- TaskBatcher uses Event Machine. Event-driven programming is tricky, and Event Machine is complex on top of that. Due to fundamental limitations, TaskBatcher can only guarantee that batches will be processed after a delay of at least the batch duration.
- Since Ruby's threading has limitations, TaskBatcher gives best performance if most/all of the client code is event-driven and uses Event Machine.