Class: ActiveMessaging::Adapters::Jms::Connection
- Inherits:
-
Object
- Object
- ActiveMessaging::Adapters::Jms::Connection
- Includes:
- ActiveMessaging::Adapter
- Defined in:
- lib/activemessaging/adapters/jms.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#consumers ⇒ Object
Returns the value of attribute consumers.
-
#producers ⇒ Object
Returns the value of attribute producers.
-
#reliable ⇒ Object
Returns the value of attribute reliable.
-
#session ⇒ Object
Returns the value of attribute session.
Instance Method Summary collapse
- #close ⇒ Object
- #find_or_create_consumer(queue_name, headers = {}) ⇒ Object
- #find_or_create_destination(queue_name, headers = {}) ⇒ Object
- #find_or_create_producer(queue_name, headers = {}) ⇒ Object
-
#initialize(cfg = {}) ⇒ Connection
constructor
A new instance of Connection.
- #receive(options = {}) ⇒ Object
- #receive_message(queue_name = nil, headers = {}) ⇒ Object
- #received(message, headers = {}) ⇒ Object
- #send(queue_name, body, headers = {}) ⇒ Object
- #subscribe(queue_name, headers = {}) ⇒ Object
- #unreceive(message, headers = {}) ⇒ Object
- #unsubscribe(queue_name, headers = {}) ⇒ Object
Methods included from ActiveMessaging::Adapter
Constructor Details
#initialize(cfg = {}) ⇒ Connection
Returns a new instance of Connection.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/activemessaging/adapters/jms.rb', line 23 def initialize cfg={} @url = cfg[:url] @login = cfg[:login] @passcode = cfg[:passcode] #initialize our connection factory if cfg.has_key? :connection_factory #this initialize is probably activemq specific. There might be a more generic #way of getting this without resorting to jndi lookup. eval <<-end_eval @connection_factory = Java::#{cfg[:connection_factory]}.new(@login, @passcode, @url) end_eval elsif cfg.has_key? :jndi @connection_factory = javax.naming.InitialContext.new().lookup(cfg[:jndi]) else raise "Either jndi or connection_factory has to be set in the config." end raise "Connection factory could not be initialized." if @connection_factory.nil? @connection = @connection_factory.create_connection() @session = @connection.createSession(false, 1) @destinations = [] @producers = {} @consumers = {} @connection.start end |
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
21 22 23 |
# File 'lib/activemessaging/adapters/jms.rb', line 21 def connection @connection end |
#consumers ⇒ Object
Returns the value of attribute consumers.
21 22 23 |
# File 'lib/activemessaging/adapters/jms.rb', line 21 def consumers @consumers end |
#producers ⇒ Object
Returns the value of attribute producers.
21 22 23 |
# File 'lib/activemessaging/adapters/jms.rb', line 21 def producers @producers end |
#reliable ⇒ Object
Returns the value of attribute reliable.
21 22 23 |
# File 'lib/activemessaging/adapters/jms.rb', line 21 def reliable @reliable end |
#session ⇒ Object
Returns the value of attribute session.
21 22 23 |
# File 'lib/activemessaging/adapters/jms.rb', line 21 def session @session end |
Instance Method Details
#close ⇒ Object
117 118 119 120 121 122 123 124 125 126 |
# File 'lib/activemessaging/adapters/jms.rb', line 117 def close @consumers.each {|k, c| c.stop } @connection.stop @session.close @connection.close @connection = nil @session = nil @consumers = {} @producers = {} end |
#find_or_create_consumer(queue_name, headers = {}) ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/activemessaging/adapters/jms.rb', line 137 def find_or_create_consumer queue_name, headers={} consumer = @consumers[queue_name] if consumer.nil? destination = find_or_create_destination queue_name, headers if headers.symbolize_keys.has_key? :selector consumer = @session.create_consumer destination, headers.symbolize_keys[:selector] else consumer = @session.create_consumer destination end @consumers[queue_name] = consumer end consumer end |
#find_or_create_destination(queue_name, headers = {}) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/activemessaging/adapters/jms.rb', line 152 def find_or_create_destination queue_name, headers={} destination = find_destination queue_name, headers[:destination_type] if destination.nil? if headers.symbolize_keys[:destination_type] == :topic destination = @session.create_topic(queue_name.to_s) @destinations << destination elsif headers.symbolize_keys[:destination_type] == :queue destination = @session.create_queue(queue_name.to_s) @destinations << destination else raise "headers[:destination_type] must be either :queue or :topic. was #{headers[:destination_type]}" end end destination end |
#find_or_create_producer(queue_name, headers = {}) ⇒ Object
128 129 130 131 132 133 134 135 |
# File 'lib/activemessaging/adapters/jms.rb', line 128 def find_or_create_producer queue_name, headers={} producer = @producers[queue_name] if producer.nil? destination = find_or_create_destination queue_name, headers producer = @session.create_producer destination end producer end |
#receive(options = {}) ⇒ Object
89 90 91 92 93 |
# File 'lib/activemessaging/adapters/jms.rb', line 89 def receive(={}) queue_name = [:queue_name] headers = [:headers] || {} (queue_name, headers) end |
#receive_message(queue_name = nil, headers = {}) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/activemessaging/adapters/jms.rb', line 95 def (queue_name=nil, headers={}) if queue_name.nil? @consumers.find do |k, c| = c.receive(1) return () unless .nil? end else consumer = subscribe(queue_name, headers) = consumer.receive(1) unsubscribe(queue_name, headers) () end end |
#received(message, headers = {}) ⇒ Object
109 110 111 |
# File 'lib/activemessaging/adapters/jms.rb', line 109 def received , headers={} #do nothing end |
#send(queue_name, body, headers = {}) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/activemessaging/adapters/jms.rb', line 63 def send queue_name, body, headers={} queue_name = check_destination_type queue_name, headers producer = find_or_create_producer queue_name, headers.symbolize_keys = @session. body headers.stringify_keys.each do |key, value| if ['id', 'message-id', 'JMSMessageID'].include? key .setJMSMessageID value.to_s elsif ['correlation-id', 'JMSCorrelationID'].include? key .setJMSCorrelationID value.to_s elsif ['expires', 'JMSExpiration'].include? key .setJMSExpiration value.to_i elsif ['persistent', 'JMSDeliveryMode'].include? key .setJMSDeliveryMode(value ? 2 : 1) elsif ['priority', 'JMSPriority'].include? key .setJMSPriority value.to_i elsif ['reply-to', 'JMSReplyTo'].include? key .setJMSReplyTo value.to_s elsif ['type', 'JMSType'].include? key .setJMSType value.to_s else #is this the most appropriate thing to do here? .set_string_property key, value.to_s end end producer.send end |
#subscribe(queue_name, headers = {}) ⇒ Object
49 50 51 52 |
# File 'lib/activemessaging/adapters/jms.rb', line 49 def subscribe queue_name, headers={} queue_name = check_destination_type queue_name, headers find_or_create_consumer queue_name, headers end |
#unreceive(message, headers = {}) ⇒ Object
113 114 115 |
# File 'lib/activemessaging/adapters/jms.rb', line 113 def unreceive , headers={} # do nothing end |
#unsubscribe(queue_name, headers = {}) ⇒ Object
54 55 56 57 58 59 60 61 |
# File 'lib/activemessaging/adapters/jms.rb', line 54 def unsubscribe queue_name, headers={} queue_name = check_destination_type queue_name, headers consumer = @consumers[queue_name] unless consumer.nil? consumer.close @consumers.delete queue_name end end |