Class: RServiceBus::MQ
- Inherits:
-
Object
- Object
- RServiceBus::MQ
- Defined in:
- lib/rservicebus/MQ.rb
Overview
Wrapper base class for Queue implementations available to the applications, allowing rservicebus to instatiate and configure queue implementations at startup
-
dependency injection.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#localQueueName ⇒ Object
readonly
Returns the value of attribute localQueueName.
Class Method Summary collapse
Instance Method Summary collapse
-
#ack ⇒ Object
“Commit” the pop.
-
#connect(host, port) ⇒ Object
Connect to the broker.
-
#initialize(uri) ⇒ MQ
constructor
Resources are attached resources, and can be specified using the URI syntax.
-
#pop ⇒ Object
Get next msg from queue.
-
#send(queueName, msg) ⇒ Object
At least called in the Host rescue block, to ensure all network links are healthy.
-
#subscribe(queuename) ⇒ Object
Connect to the receiving queue.
Constructor Details
#initialize(uri) ⇒ MQ
Resources are attached resources, and can be specified using the URI syntax.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/rservicebus/MQ.rb', line 47 def initialize( uri ) if uri.is_a? URI then @uri = uri else puts 'uri must be a valid URI' abort() end if uri.path == '' || uri.path == '/' then @localQueueName = RServiceBus.getValue( 'APPNAME', 'RServiceBus') else @localQueueName = uri.path @localQueueName[0] = '' end if @localQueueName == '' then puts "@localQueueName: #{@localQueueName}" puts 'Queue name must be supplied ' puts "*** uri, #{uri}, needs to contain a queue name" puts '*** the structure is scheme://host[:port]/queuename' abort() end @timeout = RServiceBus.getValue( 'QUEUE_TIMEOUT', '5').to_i self.connect(uri.host, uri.port) self.subscribe( @localQueueName ) end |
Instance Attribute Details
#localQueueName ⇒ Object (readonly)
Returns the value of attribute localQueueName.
14 15 16 |
# File 'lib/rservicebus/MQ.rb', line 14 def localQueueName @localQueueName end |
Class Method Details
.get ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/rservicebus/MQ.rb', line 19 def MQ.get mqString = RServiceBus.getValue( 'RSBMQ', 'beanstalk://localhost'); uri = URI.parse( mqString ) case uri.scheme when 'beanstalk' require 'rservicebus/MQ/Beanstalk' mq = MQ_Beanstalk.new( uri ) when 'redis' require 'rservicebus/MQ/Redis' mq = MQ_Redis.new( uri ) when 'rabbitmq' require 'rservicebus/MQ/RabbitMq' mq = MQ_RabbitMq.new( uri ) else abort("Scheme, #{uri.scheme}, not recognised when configuring mq, #{string}"); end return mq end |
Instance Method Details
#ack ⇒ Object
“Commit” the pop
97 98 99 |
# File 'lib/rservicebus/MQ.rb', line 97 def ack raise 'Method, ack, needs to be implemented' end |
#connect(host, port) ⇒ Object
Connect to the broker
80 81 82 |
# File 'lib/rservicebus/MQ.rb', line 80 def connect( host, port ) raise 'Method, connect, needs to be implemented' end |
#pop ⇒ Object
Get next msg from queue
92 93 94 |
# File 'lib/rservicebus/MQ.rb', line 92 def pop raise 'Method, pop, needs to be implemented' end |
#send(queueName, msg) ⇒ Object
At least called in the Host rescue block, to ensure all network links are healthy
105 106 107 108 109 110 111 112 |
# File 'lib/rservicebus/MQ.rb', line 105 def send( queueName, msg ) begin @connection.close rescue puts 'AppResource. An error was raised while closing connection to, ' + @uri.to_s end end |
#subscribe(queuename) ⇒ Object
Connect to the receiving queue
87 88 89 |
# File 'lib/rservicebus/MQ.rb', line 87 def subscribe( queuename ) raise 'Method, subscribe, needs to be implemented' end |