Kinetic

Kinetic is an AMQP worker framework designed in vein of microframeworks such as sinatra. Its goal is to provide an easy way to configure and run AMQP consumers in order to reduce developer overhead when working with AMQP. Kinetic follows the master/worker convention used in Unicorn, and its prefork code is mostly based on that of Unicorn's.

Installation

gem install kinetic

Basic Usage

Kinetic follows a pattern similar to that of Sinatra, so most of the conventions should be recognizable.

Example: simple_worker.rb

on 'message' do |message|
    puts "Received #{message}"
end

This code will create a queue on the amqp server listening for messages on the simple_worker.rb.direct exchange with the routing key 'message'. By default the message is expected to be in JSON format, however in the future other serializers may be available.

Configuration

Kinetic configuration follows a similar DSL to that of sinatra in that it allows the use of the set method to set configuration. It also allows a configuration file to be loaded by using config_file.

Using set

Simply pass in the key and value to the set method.

require 'kinetic'

set host: 'localhost'
set port: 5672

Using config_file

You can add support to your Kinetic app for file based configuration by defining a Yaml based config file. Note that the config_file directive accepts any path. Relative paths will be relative to the application file's root directory.

require 'kinetic'

config_file 'config/config.yml'

Defaults

{
  name:       File.basename($0),         # name of the application. This also translates to the name of the AMQP exchange
  root:       File.dirname($0),          # the application root directory
  app_file:   $0,                        # the application file
  workers:    1,                         # the number of worker processes to run
  log_file:   'kinetic.log',             # the path of the log file to be used
  host:       'localhost',               # the AMQP host
  port:       5672,                      # the AMQP port
  serializer: :JSON,                     # the message serializer to use
  timeout:    1,                         # the time (in seconds) to wait for various operations
  pid:        "#{File.basename($0)}.pid" # the process pid file
}

Accessing

Accessing configuration values by their method accessors will raise Kinetic::Errors::MissingConfigurationValue error. This can be used to ensure that specific values are set at runtime. To access a value without raising an exception use the hash key instead.

config.host   # raises an error if the config host value is not set
config[:host] # returns nil if the host value is not set

Changelog

0.0.4

  • removed get and get! methods as they are no longer needed.
  • added ability to correctly daemonize process

0.0.3

  • Fix bug where process shut down does not kill children
  • Add ability to set process name for master and workers
  • Add after fork block capability allowing code to be written to run after the worker forks