task_tempest
Framework for building threaded asynchronous job processors.
Description
task_tempest lets you build glorified loops. You set some configuration options, define some callbacks, then it just loops, reading messages from a queue and processing them.
You define the queue, how to read from the queue, how to write to the queue and the classes that process the queue messages. The class that does the looping is called the tempest and the classes that handle job are called tasks.
Defining a tempest
Defining a tempest is simple. You just derive off of TaskTempest::Engine
, set some configuration options and callbacks, then just instantiate the class and call run
.
require "task_tempest"
class MyTempest < TaskTempest::Engine
process_name "my_tempest"
queue do |logger|
logger.debug "initializing queue"
SuperCoolQueue.new(...)
end
dequeue do |queue, logger|
message = queue.pop
if message
logger.info "message received"
YAML.load(message.body)
else
nil
end
end
end
queue
is given a logger object and required to return an instance of your queue.
dequeue
is given the queue object and a logger object and required to return a tuple [task_id, task_class_name, *task_args]
.
There are many more configuration options to set and callbacks to define, please see the rdocs.
TODO provide rdocs.
Running a tempest
You simply instantiate the tempest and call run
.
MyTempest.new.run
Catching an Interrupt
or SystemExit
exception will attempt a graceful shutdown, as will catching a SIGUSR2
signal. Catching a SIGTERM
signal will try to exit immediately.
Running as a daemon
There is no code in task_tempest
to run as a daemon, that is left to you. It’s easy very easy with the Daemons gem though.
Assuming your tempest is defined in my_tempest.rb
, just put the following code at the bottom of the file.
if $0 == __FILE__
require "daemons"
Daemons.run_proc(MyTempest.settings.process_name, :log_output => true) do
MyTempest.new.run
end
end
Now you can run it as a daemon from the command line.
ruby my_tempest.rb start
ruby my_tempest.rb stop
ruby my_tempest.rb run # Run in foreground
See the rdoc for Daemons for more info.
Defining a task
A task is what handles messages pulled off the queue.
require "task_tempest"
class GreeterTask < TaskTempest::Task
def start(person, greeting)
logger.info "about to greet #{person}"
puts "#{greeting}, #{person}!"
end
end
start
can take whatever arguments you want, but it must correspond with the arguments you put in the message.
Messages
A message is what is returned by the dequeue
callback. They are simply arrays of the form [task_id, task_class_name, *task_args]
. Note that if task_id
is nil, then one will be generated for automatically.
An message like…
[nil, "GreeterTask", "Christopher", "Hello"]
…would cause our GreeterTask
to puts “Hello, Christopher!”.
Submitting tasks
You can push messages on to the queue however you like, but task_tempest
provides a little convenience. Assuming our previous examples…
task = GreeterTask.new("Christopher", "Hello")
MyTempest.submit(task)
or
= [nil, "GreeterTask", "Christopher", "Hello"]
MyTempest.submit()
For this to work, you need to define the enqueue
callback in your tempest definition.
require "task_tempest"
class MyTempest < TaskTempest::Engine
...
enqueue do |queue, message, logger, *args|
logger.info "enqueuing message #{message.inspect}"
queue.push(YAML.dump(message))
end
...
end
The args
argument is passed through via TaskTempest::Engine.submit
. For example, if you called submit
like…
MyTempest.submit(, "one", "two")
Then args
would be ["one", "two"]
. This is useful if you are using a priority queue and you want to submit your task with a given priority.
Complete example
See the examples
directory.
Copyright
Copyright © 2010 Christopher J. Bottaro. See LICENSE for details.