EM::Beanstalk
An attempt to wrap portions of the Beanstalk protocol with EventMachine. Every command will return a deferrable object. As well, every command can accept a block with will be used as the succeed callback for each command. That object will succeed or fail depending on the reply returned from Beanstalk.
The default errback is to print the error message received.
One thing to keep in mind. The Beanstalk protocol executes all commands serially. So, if you send a reserve command and there are no jobs Beanstalk _won’t_ process any of the commands that come after the reserve until the reserve completes.
This is a bit of a gotcha when sending a series of reserve, delete, etc and there are no available jobs.
Dependencies
- eventmachine
For testing
- rspec
- em-spec
Examples
EM.run {
jack = EM::Beanstalk.new
jack.use('mytube') { |tube| puts "Using #{tube}" }
jack.reserve do |job|
puts job.id
process(job)
jack.delete(job) { puts "Successfully deleted" }
end
jack.put("my message", :ttr => 300) { |id| puts "put successful #{id}" }
jack.stats { |stats| puts "Server up for #{stats['uptime']} seconds" "}
jack.stats(:tube, "mytube") { |stats| puts "Total jobs #{stats['total-jobs']}" }
jack.list(:tubes) { |tubes| puts "There are #{tubes.length} tubes defined" }
}
If you need custom error handling, you can chain the error event handler
Examples
EM.run {
jack = EM::Beanstalk.new
jack.delete(job_id) {
puts "deleted job!"
}.on_error {||
puts "I couldn't delete the job because of #{}"
}
}
EM::Beanstalk#each_job is useful for scenarios where the client is a job worker and intended to process jobs continuously as they become available. Once the queue is empty, the client will block and wait for a new job.
If multiple workers connect to the queue Beanstalkd will round-robin between the workers.
EM.run {
jack = EM::Beanstalk.new
jack.each_job do |job|
puts "Got job ##{job.id}: #{job}"
if process(job)
jack.delete(job) { puts "*Deleted #{job}*" }
else
# something went horribly wrong!
end
end
def process(job)
# Some kind of job processing
end
}