Class: RServiceBus::MQ_Beanstalk

Inherits:
MQ
  • Object
show all
Defined in:
lib/rservicebus/MQ/Beanstalk.rb

Overview

Beanstalk client implementation.

Instance Attribute Summary

Attributes inherited from MQ

#localQueueName

Instance Method Summary collapse

Methods inherited from MQ

get, #initialize

Constructor Details

This class inherits a constructor from RServiceBus::MQ

Instance Method Details

#ackObject

“Commit” queue



63
64
65
66
# File 'lib/rservicebus/MQ/Beanstalk.rb', line 63

def ack
    @job.delete
    @job = nil;
end

#connect(host, port) ⇒ Object

Connect to the broker



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rservicebus/MQ/Beanstalk.rb', line 12

def connect( host, port )
    port ||= 11300
    string = "#{host}:#{port}"

    begin
        @beanstalk = Beanstalk::Pool.new([string])
        
        @max_job_size = @beanstalk.stats['max-job-size']
        if @max_job_size < 4194304 then
            puts "***WARNING: Lowest recommended.max-job-size is 4m, current max-job-size, #{@max_job_size.to_f / (1024*1024)}m"
            puts '***WARNING: Set the job size with the -z switch, eg /usr/local/bin/beanstalkd -z 4194304'
        end
        rescue Exception => e
        puts 'Error connecting to Beanstalk'
        puts "Host string, #{string}"
        if e.message == 'Beanstalk::NotConnected' then
            puts '***Most likely, beanstalk is not running. Start beanstalk, and try running this again.'
            puts "***If you still get this error, check beanstalk is running at, #{string}"
            else
            puts e.message
            puts e.backtrace
        end
        abort()
    end
end

#popObject

Get next msg from queue



45
46
47
48
49
50
51
52
53
54
55
# File 'lib/rservicebus/MQ/Beanstalk.rb', line 45

def pop
    begin
        @job = @beanstalk.reserve @timeout
        rescue Exception => e
        if e.message == 'TIMED_OUT' then
            raise NoMsgToProcess.new
        end
        raise e
    end
    return @job.body
end

#returnToQueueObject



57
58
59
60
# File 'lib/rservicebus/MQ/Beanstalk.rb', line 57

def returnToQueue
@job.release

end

#send(queueName, msg) ⇒ Object

At least called in the Host rescue block, to ensure all network links are healthy



69
70
71
72
73
74
75
76
77
# File 'lib/rservicebus/MQ/Beanstalk.rb', line 69

def send( queueName, msg )
    if msg.length > @max_job_size then
        puts '***Attempting to send a msg which will not fit on queue.'
        puts "***Msg size, #{msg.length}, max msg size, #{@max_job_size}."
        raise JobTooBigError.new( "Msg size, #{msg.length}, max msg size, #{@max_job_size}" )
    end
    @beanstalk.use( queueName )
    @beanstalk.put( msg )
end

#subscribe(queuename) ⇒ Object

Connect to the queue



40
41
42
# File 'lib/rservicebus/MQ/Beanstalk.rb', line 40

def subscribe( queuename )
    @beanstalk.watch( queuename )
end