Class: ActiveMessaging::Adapters::Stomp::Connection
- Inherits:
-
BaseConnection
- Object
- BaseConnection
- ActiveMessaging::Adapters::Stomp::Connection
- Defined in:
- lib/activemessaging/adapters/stomp.rb
Instance Attribute Summary collapse
-
#configuration ⇒ Object
Returns the value of attribute configuration.
-
#deadLetterQueue ⇒ Object
Returns the value of attribute deadLetterQueue.
-
#deadLetterQueuePrefix ⇒ Object
Returns the value of attribute deadLetterQueuePrefix.
-
#retryMax ⇒ Object
Returns the value of attribute retryMax.
-
#stomp_connection ⇒ Object
Returns the value of attribute stomp_connection.
Attributes inherited from BaseConnection
Instance Method Summary collapse
-
#add_dlq_prefix(destination) ⇒ Object
add the dead letter queue prefix to the destination.
-
#disconnect ⇒ Object
called to cleanly get rid of connection.
-
#initialize(cfg) ⇒ Connection
constructor
A new instance of Connection.
-
#receive(options = {}) ⇒ Object
receive a single message from any of the subscribed destinations check each destination once, then sleep for poll_interval.
- #received(message, headers = {}) ⇒ Object
-
#send(destination_name, message_body, message_headers = {}) ⇒ Object
destination_name string, body string, headers hash send a single message to a destination.
-
#stomp_publish(destination_name = "", message_body = "", message_headers = {}) ⇒ Object
send has been deprecated in latest stomp gem (as it should be).
-
#subscribe(destination_name, message_headers = {}) ⇒ Object
destination_name string, headers hash subscribe to listen on a destination.
-
#supports_dlq? ⇒ Boolean
Checks if the connection supports dead letter queues.
- #unreceive(message, headers = {}) ⇒ Object
-
#unsubscribe(destination_name, message_headers = {}) ⇒ Object
destination_name string, headers hash unsubscribe to listen on a destination.
Methods included from ActiveMessaging::Adapter
Constructor Details
#initialize(cfg) ⇒ Connection
Returns a new instance of Connection.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/activemessaging/adapters/stomp.rb', line 14 def initialize(cfg) @retryMax = cfg[:retryMax] || 0 @deadLetterQueue = cfg[:deadLetterQueue] || nil @deadLetterQueuePrefix = cfg[:deadLetterQueuePrefix] || nil cfg[:login] ||= "" cfg[:passcode] ||= "" cfg[:host] ||= "localhost" cfg[:port] ||= "61613" cfg[:reliable] = cfg[:reliable].nil? ? TRUE : cfg[:reliable] cfg[:reconnectDelay] ||= 5 cfg[:clientId] ||= nil # hold on to the config @configuration = cfg # create a new stomp connection connect_headers = {} connect_headers['client-id'] = cfg[:clientId] if cfg[:clientId] @stomp_connection = ::Stomp::Connection.new(cfg[:login],cfg[:passcode],cfg[:host],cfg[:port].to_i,cfg[:reliable],cfg[:reconnectDelay], connect_headers) end |
Instance Attribute Details
#configuration ⇒ Object
Returns the value of attribute configuration.
12 13 14 |
# File 'lib/activemessaging/adapters/stomp.rb', line 12 def configuration @configuration end |
#deadLetterQueue ⇒ Object
Returns the value of attribute deadLetterQueue.
12 13 14 |
# File 'lib/activemessaging/adapters/stomp.rb', line 12 def deadLetterQueue @deadLetterQueue end |
#deadLetterQueuePrefix ⇒ Object
Returns the value of attribute deadLetterQueuePrefix.
12 13 14 |
# File 'lib/activemessaging/adapters/stomp.rb', line 12 def deadLetterQueuePrefix @deadLetterQueuePrefix end |
#retryMax ⇒ Object
Returns the value of attribute retryMax.
12 13 14 |
# File 'lib/activemessaging/adapters/stomp.rb', line 12 def retryMax @retryMax end |
#stomp_connection ⇒ Object
Returns the value of attribute stomp_connection.
12 13 14 |
# File 'lib/activemessaging/adapters/stomp.rb', line 12 def stomp_connection @stomp_connection end |
Instance Method Details
#add_dlq_prefix(destination) ⇒ Object
add the dead letter queue prefix to the destination
42 43 44 45 46 47 48 49 |
# File 'lib/activemessaging/adapters/stomp.rb', line 42 def add_dlq_prefix(destination) if (ri = destination.rindex("/")) destination.clone.insert(ri + 1, @deadLetterQueuePrefix) else @deadLetterQueuePrefix + destination end end |
#disconnect ⇒ Object
called to cleanly get rid of connection
52 53 54 |
# File 'lib/activemessaging/adapters/stomp.rb', line 52 def disconnect @stomp_connection.disconnect end |
#receive(options = {}) ⇒ Object
receive a single message from any of the subscribed destinations check each destination once, then sleep for poll_interval
76 77 78 79 |
# File 'lib/activemessaging/adapters/stomp.rb', line 76 def receive(={}) m = @stomp_connection.receive Message.new(m) if m end |
#received(message, headers = {}) ⇒ Object
81 82 83 84 85 86 87 88 |
# File 'lib/activemessaging/adapters/stomp.rb', line 81 def received , headers={} #check to see if the ack mode for this subscription is auto or client # if the ack mode is client, send an ack if (headers[:ack] === 'client') ack_headers = .headers.has_key?(:transaction) ? { :transaction=>.headers[:transaction]} : {} @stomp_connection.ack(.headers['message-id'], ack_headers) end end |
#send(destination_name, message_body, message_headers = {}) ⇒ Object
destination_name string, body string, headers hash send a single message to a destination
70 71 72 |
# File 'lib/activemessaging/adapters/stomp.rb', line 70 def send destination_name, , ={} stomp_publish(destination_name, , ) end |
#stomp_publish(destination_name = "", message_body = "", message_headers = {}) ⇒ Object
send has been deprecated in latest stomp gem (as it should be)
91 92 93 94 95 96 97 |
# File 'lib/activemessaging/adapters/stomp.rb', line 91 def stomp_publish(destination_name="", ="", ={}) if @stomp_connection.respond_to?(:publish) @stomp_connection.publish(destination_name, , ) else @stomp_connection.send(destination_name, , ) end end |
#subscribe(destination_name, message_headers = {}) ⇒ Object
destination_name string, headers hash subscribe to listen on a destination
58 59 60 |
# File 'lib/activemessaging/adapters/stomp.rb', line 58 def subscribe destination_name, ={} @stomp_connection.subscribe(destination_name, ) end |
#supports_dlq? ⇒ Boolean
Checks if the connection supports dead letter queues
37 38 39 |
# File 'lib/activemessaging/adapters/stomp.rb', line 37 def supports_dlq? !@deadLetterQueue.nil? || !@deadLetterQueuePrefix.nil? end |
#unreceive(message, headers = {}) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/activemessaging/adapters/stomp.rb', line 99 def unreceive , headers={} retry_count = .headers['a13g-retry-count'].to_i || 0 transaction_id = "transaction-#{.headers['message-id']}-#{retry_count}" # start a transaction, send the message back to the original destination @stomp_connection.begin(transaction_id) begin if @retryMax > 0 retry_headers = .headers.stringify_keys retry_headers['transaction']= transaction_id content_type_header = retry_headers.delete('content-type') content_length_header = retry_headers.delete('content-length') # If the content-length header in the original message is nil # then we need to set the :suppress_content_length option so # that the stomp client does not set the content-length of the # retried message. This option will allow ActiveMQ to interpret the # message as a TextMessage. # This is somewhat of a hack because the setting of the :suppress_content_length # header is usually done in the messaging.rb and is removed by the time # the unreceive message is called. So I am making some assumptions here # on whether or not to set the option if content_type_header and content_type_header.include?('text/plain') && content_length_header.nil? retry_headers[:suppress_content_length] = true end retry_destination = retry_headers.delete('destination') retry_destination = headers[:destination] if headers[:destination] if retry_count < @retryMax # now send the message back to the destination # set the headers for message id, priginal message id, and retry count retry_headers['a13g-original-message-id'] = retry_headers['message-id'] unless retry_headers.has_key?('a13g-original-message-id') retry_headers.delete('message-id') retry_headers['a13g-original-timestamp'] = retry_headers['timestamp'] unless retry_headers.has_key?('a13g-original-timestamp') retry_headers.delete('timestamp') retry_headers['a13g-retry-count'] = retry_count + 1 # send the updated message to retry in the same transaction logger.warn "retrying message on #{retry_destination}" self.stomp_publish(retry_destination, .body, retry_headers) elsif retry_count >= @retryMax && supports_dlq? # send the 'poison pill' message to the dead letter queue - make it persistent by default retry_headers['a13g-original-destination'] = retry_destination #retry_headers.delete('destination') retry_headers['persistent'] = true retry_headers.delete('message-id') # If the prefix option is set then put the prefix after the /queue/ or /topic/ if (@deadLetterQueuePrefix) dlq = add_dlq_prefix(retry_destination) else dlq = @deadLetterQueue end logger.warn "putting message on DLQ: #{dlq}" self.stomp_publish(dlq, .body, retry_headers) end end #check to see if the ack mode is client, and if it is, ack it in this transaction if (headers[:ack] === 'client') # ack the original message @stomp_connection.ack(.headers['message-id'], .headers.stringify_keys.merge('transaction'=>transaction_id)) end # now commit the transaction @stomp_connection.commit transaction_id rescue Exception=>exc # if there is an error, try to abort the transaction, then raise the error @stomp_connection.abort transaction_id raise exc end end |
#unsubscribe(destination_name, message_headers = {}) ⇒ Object
destination_name string, headers hash unsubscribe to listen on a destination
64 65 66 |
# File 'lib/activemessaging/adapters/stomp.rb', line 64 def unsubscribe destination_name, ={} @stomp_connection.unsubscribe(destination_name, ) end |