Class: ActiveMessaging::Adapters::Stomp::Connection

Inherits:
BaseConnection show all
Defined in:
lib/activemessaging/adapters/stomp.rb

Instance Attribute Summary collapse

Attributes inherited from BaseConnection

#reliable

Instance Method Summary collapse

Methods included from ActiveMessaging::Adapter

included, #logger

Constructor Details

#initialize(cfg) ⇒ Connection

Returns a new instance of Connection.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/activemessaging/adapters/stomp.rb', line 14

def initialize(cfg)
  @retryMax = cfg[:retryMax] || 0
  @deadLetterQueue = cfg[:deadLetterQueue] || nil
  @deadLetterQueuePrefix = cfg[:deadLetterQueuePrefix] || nil

  cfg[:login] ||= ""
  cfg[:passcode] ||= ""
  cfg[:host] ||= "localhost"
  cfg[:port] ||= "61613"
  cfg[:reliable]  = cfg[:reliable].nil? ? TRUE : cfg[:reliable]
  cfg[:reconnectDelay] ||= 5
  cfg[:clientId] ||= nil

  # hold on to the config
  @configuration = cfg

  # create a new stomp connection
  connect_headers = {}
  connect_headers['client-id'] = cfg[:clientId] if cfg[:clientId]
  @stomp_connection = ::Stomp::Connection.new(cfg[:login],cfg[:passcode],cfg[:host],cfg[:port].to_i,cfg[:reliable],cfg[:reconnectDelay], connect_headers)
end

Instance Attribute Details

#configurationObject

Returns the value of attribute configuration.



12
13
14
# File 'lib/activemessaging/adapters/stomp.rb', line 12

def configuration
  @configuration
end

#deadLetterQueueObject

Returns the value of attribute deadLetterQueue.



12
13
14
# File 'lib/activemessaging/adapters/stomp.rb', line 12

def deadLetterQueue
  @deadLetterQueue
end

#deadLetterQueuePrefixObject

Returns the value of attribute deadLetterQueuePrefix.



12
13
14
# File 'lib/activemessaging/adapters/stomp.rb', line 12

def deadLetterQueuePrefix
  @deadLetterQueuePrefix
end

#retryMaxObject

Returns the value of attribute retryMax.



12
13
14
# File 'lib/activemessaging/adapters/stomp.rb', line 12

def retryMax
  @retryMax
end

#stomp_connectionObject

Returns the value of attribute stomp_connection.



12
13
14
# File 'lib/activemessaging/adapters/stomp.rb', line 12

def stomp_connection
  @stomp_connection
end

Instance Method Details

#add_dlq_prefix(destination) ⇒ Object

add the dead letter queue prefix to the destination



42
43
44
45
46
47
48
49
# File 'lib/activemessaging/adapters/stomp.rb', line 42

def add_dlq_prefix(destination)
    if (ri = destination.rindex("/"))
        destination.clone.insert(ri + 1, @deadLetterQueuePrefix)
    else
        @deadLetterQueuePrefix + destination
    end

end

#disconnectObject

called to cleanly get rid of connection



52
53
54
# File 'lib/activemessaging/adapters/stomp.rb', line 52

def disconnect
  @stomp_connection.disconnect
end

#receive(options = {}) ⇒ Object

receive a single message from any of the subscribed destinations check each destination once, then sleep for poll_interval



76
77
78
79
# File 'lib/activemessaging/adapters/stomp.rb', line 76

def receive(options={})
  m = @stomp_connection.receive
  Message.new(m) if m
end

#received(message, headers = {}) ⇒ Object



81
82
83
84
85
86
87
88
# File 'lib/activemessaging/adapters/stomp.rb', line 81

def received message, headers={}
  #check to see if the ack mode for this subscription is auto or client
  # if the ack mode is client, send an ack
  if (headers[:ack] === 'client')
    ack_headers = message.headers.has_key?(:transaction) ? { :transaction=>message.headers[:transaction]} : {}
    @stomp_connection.ack(message.headers['message-id'], ack_headers)
  end
end

#send(destination_name, message_body, message_headers = {}) ⇒ Object

destination_name string, body string, headers hash send a single message to a destination



70
71
72
# File 'lib/activemessaging/adapters/stomp.rb', line 70

def send destination_name, message_body, message_headers={}
  stomp_publish(destination_name, message_body, message_headers)
end

#stomp_publish(destination_name = "", message_body = "", message_headers = {}) ⇒ Object

send has been deprecated in latest stomp gem (as it should be)



91
92
93
94
95
96
97
# File 'lib/activemessaging/adapters/stomp.rb', line 91

def stomp_publish(destination_name="", message_body="", message_headers={})
  if @stomp_connection.respond_to?(:publish)
    @stomp_connection.publish(destination_name, message_body, message_headers)
  else
    @stomp_connection.send(destination_name, message_body, message_headers)
  end
end

#subscribe(destination_name, message_headers = {}) ⇒ Object

destination_name string, headers hash subscribe to listen on a destination



58
59
60
# File 'lib/activemessaging/adapters/stomp.rb', line 58

def subscribe destination_name, message_headers={}
  @stomp_connection.subscribe(destination_name, message_headers)
end

#supports_dlq?Boolean

Checks if the connection supports dead letter queues

Returns:

  • (Boolean)


37
38
39
# File 'lib/activemessaging/adapters/stomp.rb', line 37

def supports_dlq?
  !@deadLetterQueue.nil? || !@deadLetterQueuePrefix.nil?
end

#unreceive(message, headers = {}) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/activemessaging/adapters/stomp.rb', line 99

def unreceive message, headers={}
  retry_count = message.headers['a13g-retry-count'].to_i || 0
  transaction_id = "transaction-#{message.headers['message-id']}-#{retry_count}"
  # start a transaction, send the message back to the original destination
  @stomp_connection.begin(transaction_id)
  begin

    if @retryMax > 0
      retry_headers = message.headers.stringify_keys
      retry_headers['transaction']= transaction_id
      content_type_header = retry_headers.delete('content-type')
      content_length_header = retry_headers.delete('content-length')
      # If the content-length header in the original message is nil 
      # then we need to set the :suppress_content_length option so 
      # that the stomp client does not set the content-length of the 
      # retried message. This option will allow ActiveMQ to interpret the 
      # message as a TextMessage.
      # This is somewhat of a hack because the setting of the :suppress_content_length 
      # header is usually done in the messaging.rb and is removed by the time 
      # the unreceive message is called. So I am making some assumptions here
      # on whether or not to set the option
      if content_type_header and content_type_header.include?('text/plain') && content_length_header.nil?
          retry_headers[:suppress_content_length] = true
      end
                   
      retry_destination = retry_headers.delete('destination')
      retry_destination = headers[:destination] if headers[:destination]

      if retry_count < @retryMax
        # now send the message back to the destination
        #  set the headers for message id, priginal message id, and retry count
        retry_headers['a13g-original-message-id'] = retry_headers['message-id'] unless retry_headers.has_key?('a13g-original-message-id')
        retry_headers.delete('message-id')

        retry_headers['a13g-original-timestamp'] = retry_headers['timestamp'] unless retry_headers.has_key?('a13g-original-timestamp')
        retry_headers.delete('timestamp')

        retry_headers['a13g-retry-count'] = retry_count + 1

        # send the updated message to retry in the same transaction
        logger.warn "retrying message on #{retry_destination}"
        self.stomp_publish(retry_destination, message.body, retry_headers)

      elsif retry_count >= @retryMax && supports_dlq?
        # send the 'poison pill' message to the dead letter queue - make it persistent by default
        retry_headers['a13g-original-destination'] = retry_destination #retry_headers.delete('destination')
        retry_headers['persistent'] = true
        retry_headers.delete('message-id')
        
        # If the prefix option is set then put the prefix after the /queue/ or /topic/
        if (@deadLetterQueuePrefix)
            dlq = add_dlq_prefix(retry_destination)
        else
            dlq = @deadLetterQueue
        end
        logger.warn "putting message on DLQ: #{dlq}"
        self.stomp_publish(dlq, message.body, retry_headers)
      end

    end

    #check to see if the ack mode is client, and if it is, ack it in this transaction
    if (headers[:ack] === 'client')
      # ack the original message
      @stomp_connection.ack(message.headers['message-id'], message.headers.stringify_keys.merge('transaction'=>transaction_id))
    end

    # now commit the transaction
    @stomp_connection.commit transaction_id
  rescue Exception=>exc
    # if there is an error, try to abort the transaction, then raise the error
    @stomp_connection.abort transaction_id
    raise exc
  end

end

#unsubscribe(destination_name, message_headers = {}) ⇒ Object

destination_name string, headers hash unsubscribe to listen on a destination



64
65
66
# File 'lib/activemessaging/adapters/stomp.rb', line 64

def unsubscribe destination_name, message_headers={}
  @stomp_connection.unsubscribe(destination_name, message_headers)
end