task_tempest

Framework for building threaded asynchronous job processors.

Description

task_tempest lets you build glorified loops. You set some configuration options, define some callbacks, then it just loops, reading messages from a queue and processing them.

You define the queue, how to read from the queue, how to write to the queue and the classes that process the queue messages. The class that does the looping is called the tempest and the classes that handle job are called tasks.

Defining a tempest

Defining a tempest is simple. You just derive off of TaskTempest::Engine, set some configuration options and callbacks, then just instantiate the class and call run.

require "task_tempest"
class MyTempest < TaskTempest::Engine
  process_name "my_tempest"

  queue do |logger|
    logger.debug "initializing queue"
    SuperCoolQueue.new(...)
  end

  dequeue do |queue, logger|
    message = queue.pop
    if message
      logger.info "message received"
      YAML.load(message.body)
    else
      nil
    end
  end
end

queue is given a logger object and required to return an instance of your queue.

dequeue is given the queue object and a logger object and required to return a tuple [task_id, task_class_name, *task_args].

There are many more configuration options to set and callbacks to define, please see the rdocs.

TODO provide rdocs.

Running a tempest

You simply instantiate the tempest and call run.

MyTempest.new.run

Catching an Interrupt or SystemExit exception will attempt a graceful shutdown, as will catching a SIGUSR2 signal. Catching a SIGTERM signal will try to exit immediately.

Running as a daemon

There is no code in task_tempest to run as a daemon, that is left to you. It’s easy very easy with the Daemons gem though.

Assuming your tempest is defined in my_tempest.rb, just put the following code at the bottom of the file.

if $0 == __FILE__
  require "daemons"
  Daemons.run_proc(MyTempest.settings.process_name, :log_output => true) do
    MyTempest.new.run
  end
end

Now you can run it as a daemon from the command line.

ruby my_tempest.rb start
ruby my_tempest.rb stop
ruby my_tempest.rb run # Run in foreground

See the rdoc for Daemons for more info.

Defining a task

A task is what handles messages pulled off the queue.

require "task_tempest"
class GreeterTask < TaskTempest::Task
  def start(person, greeting)
    logger.info "about to greet #{person}"
    puts "#{greeting}, #{person}!"
  end
end

start can take whatever arguments you want, but it must correspond with the arguments you put in the message.

Messages

A message is what is returned by the dequeue callback. They are simply arrays of the form [task_id, task_class_name, *task_args]. Note that if task_id is nil, then one will be generated for automatically.

An message like…

[nil, "GreeterTask", "Christopher", "Hello"]

…would cause our GreeterTask to puts “Hello, Christopher!”.

Submitting tasks

You can push messages on to the queue however you like, but task_tempest provides a little convenience. Assuming our previous examples…

task = GreeterTask.new("Christopher", "Hello")
MyTempest.submit(task)

or

message = [nil, "GreeterTask", "Christopher", "Hello"]
MyTempest.submit(message)

For this to work, you need to define the enqueue callback in your tempest definition.

require "task_tempest"
class MyTempest < TaskTempest::Engine
  ...
  enqueue do |queue, message, logger, *args|
    logger.info "enqueuing message #{message.inspect}"
    queue.push(YAML.dump(message))
  end
  ...
end

The args argument is passed through via TaskTempest::Engine.submit. For example, if you called submit like…

MyTempest.submit(message, "one", "two")

Then args would be ["one", "two"]. This is useful if you are using a priority queue and you want to submit your task with a given priority.

Complete example

See the examples directory.

Copyright © 2010 Christopher J. Bottaro. See LICENSE for details.