Class: ActiveMessaging::Adapters::Jms::Connection
- Inherits:
-
Object
- Object
- ActiveMessaging::Adapters::Jms::Connection
- Includes:
- ActiveMessaging::Adapter
- Defined in:
- lib/activemessaging/adapters/jms.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#consumers ⇒ Object
Returns the value of attribute consumers.
-
#producers ⇒ Object
Returns the value of attribute producers.
-
#reliable ⇒ Object
Returns the value of attribute reliable.
-
#session ⇒ Object
Returns the value of attribute session.
Instance Method Summary collapse
- #close ⇒ Object
- #find_or_create_consumer(queue_name, headers = {}) ⇒ Object
- #find_or_create_destination(queue_name, headers = {}) ⇒ Object
- #find_or_create_producer(queue_name, headers = {}) ⇒ Object
-
#initialize(cfg = {}) ⇒ Connection
constructor
A new instance of Connection.
- #receive(queue_name = nil, headers = {}) ⇒ Object
- #receive_any ⇒ Object
- #received(message, headers = {}) ⇒ Object
- #send(queue_name, body, headers = {}) ⇒ Object
- #subscribe(queue_name, headers = {}) ⇒ Object
- #unreceive(message, headers = {}) ⇒ Object
- #unsubscribe(queue_name, headers = {}) ⇒ Object
Methods included from ActiveMessaging::Adapter
Constructor Details
#initialize(cfg = {}) ⇒ Connection
Returns a new instance of Connection.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/activemessaging/adapters/jms.rb', line 22 def initialize cfg={} @url = cfg[:url] @login = cfg[:login] @passcode = cfg[:passcode] #initialize our connection factory if cfg.has_key? :connection_factory #this initialize is probably activemq specific. There might be a more generic #way of getting this without resorting to jndi lookup. eval <<-end_eval @connection_factory = Java::#{cfg[:connection_factory]}.new(@login, @passcode, @url) end_eval elsif cfg.has_key? :jndi @connection_factory = javax.naming.InitialContext.new().lookup(cfg[:jndi]) else raise "Either jndi or connection_factory has to be set in the config." end raise "Connection factory could not be initialized." if @connection_factory.nil? @connection = @connection_factory.create_connection() @session = @connection.createSession(false, 1) @destinations = [] @producers = {} @consumers = {} @connection.start end |
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
20 21 22 |
# File 'lib/activemessaging/adapters/jms.rb', line 20 def connection @connection end |
#consumers ⇒ Object
Returns the value of attribute consumers.
20 21 22 |
# File 'lib/activemessaging/adapters/jms.rb', line 20 def consumers @consumers end |
#producers ⇒ Object
Returns the value of attribute producers.
20 21 22 |
# File 'lib/activemessaging/adapters/jms.rb', line 20 def producers @producers end |
#reliable ⇒ Object
Returns the value of attribute reliable.
20 21 22 |
# File 'lib/activemessaging/adapters/jms.rb', line 20 def reliable @reliable end |
#session ⇒ Object
Returns the value of attribute session.
20 21 22 |
# File 'lib/activemessaging/adapters/jms.rb', line 20 def session @session end |
Instance Method Details
#close ⇒ Object
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/activemessaging/adapters/jms.rb', line 114 def close @consumers.each {|k, c| c.stop } @connection.stop @session.close @connection.close @connection = nil @session = nil @consumers = {} @producers = {} end |
#find_or_create_consumer(queue_name, headers = {}) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/activemessaging/adapters/jms.rb', line 134 def find_or_create_consumer queue_name, headers={} consumer = @consumers[queue_name] if consumer.nil? destination = find_or_create_destination queue_name, headers if headers.symbolize_keys.has_key? :selector consumer = @session.create_consumer destination, headers.symbolize_keys[:selector] else consumer = @session.create_consumer destination end @consumers[queue_name] = consumer end consumer end |
#find_or_create_destination(queue_name, headers = {}) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/activemessaging/adapters/jms.rb', line 149 def find_or_create_destination queue_name, headers={} destination = find_destination queue_name, headers[:destination_type] if destination.nil? if headers.symbolize_keys[:destination_type] == :topic destination = @session.create_topic(queue_name.to_s) @destinations << destination elsif headers.symbolize_keys[:destination_type] == :queue destination = @session.create_queue(queue_name.to_s) @destinations << destination else raise "headers[:destination_type] must be either :queue or :topic. was #{headers[:destination_type]}" end end destination end |
#find_or_create_producer(queue_name, headers = {}) ⇒ Object
125 126 127 128 129 130 131 132 |
# File 'lib/activemessaging/adapters/jms.rb', line 125 def find_or_create_producer queue_name, headers={} producer = @producers[queue_name] if producer.nil? destination = find_or_create_destination queue_name, headers producer = @session.create_producer destination end producer end |
#receive(queue_name = nil, headers = {}) ⇒ Object
95 96 97 98 99 100 101 102 103 104 |
# File 'lib/activemessaging/adapters/jms.rb', line 95 def receive queue_name=nil, headers={} if queue_name.nil? receive_any else consumer = subscribe queue_name, headers = consumer.receive(1) unsubscribe queue_name, headers end end |
#receive_any ⇒ Object
88 89 90 91 92 93 |
# File 'lib/activemessaging/adapters/jms.rb', line 88 def receive_any @consumers.find do |k, c| = c.receive(1) return () unless .nil? end end |
#received(message, headers = {}) ⇒ Object
106 107 108 |
# File 'lib/activemessaging/adapters/jms.rb', line 106 def received , headers={} #do nothing end |
#send(queue_name, body, headers = {}) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/activemessaging/adapters/jms.rb', line 62 def send queue_name, body, headers={} queue_name = check_destination_type queue_name, headers producer = find_or_create_producer queue_name, headers.symbolize_keys = @session. body headers.stringify_keys.each do |key, value| if ['id', 'message-id', 'JMSMessageID'].include? key .setJMSMessageID value.to_s elsif ['correlation-id', 'JMSCorrelationID'].include? key .setJMSCorrelationID value.to_s elsif ['expires', 'JMSExpiration'].include? key .setJMSExpiration value.to_i elsif ['persistent', 'JMSDeliveryMode'].include? key .setJMSDeliveryMode(value ? 2 : 1) elsif ['priority', 'JMSPriority'].include? key .setJMSPriority value.to_i elsif ['reply-to', 'JMSReplyTo'].include? key .setJMSReplyTo value.to_s elsif ['type', 'JMSType'].include? key .setJMSType value.to_s else #is this the most appropriate thing to do here? .set_string_property key, value.to_s end end producer.send end |
#subscribe(queue_name, headers = {}) ⇒ Object
48 49 50 51 |
# File 'lib/activemessaging/adapters/jms.rb', line 48 def subscribe queue_name, headers={} queue_name = check_destination_type queue_name, headers find_or_create_consumer queue_name, headers end |
#unreceive(message, headers = {}) ⇒ Object
110 111 112 |
# File 'lib/activemessaging/adapters/jms.rb', line 110 def unreceive , headers={} # do nothing end |
#unsubscribe(queue_name, headers = {}) ⇒ Object
53 54 55 56 57 58 59 60 |
# File 'lib/activemessaging/adapters/jms.rb', line 53 def unsubscribe queue_name, headers={} queue_name = check_destination_type queue_name, headers consumer = @consumers[queue_name] unless consumer.nil? consumer.close @consumers.delete queue_name end end |