Class: Messagebus::Consumer

Inherits:
Connection show all
Defined in:
lib/messagebus/consumer.rb

Overview

Consumer client class. Provides a single access thread for all messagebus servers. Takes in a list of messagebus servers to receive from, open connections to all servers, does round robin receives across all servers.

parameters:
dest      (String, required value, name of the queue/topic)
host_params      (list<string>, required value,  eg. '[localhost:61613]')
options : A hash map for optional values.
  user     (String,  default : '')
  passwd  (String,  default : '')
  ack_type      (String, required value: Messagebus::ACK_TYPE_AUTO_CLIENT OR Messagebus::ACK_TYPE_CLIENT)
                autoClient: module acks internally automatically for each receive.
                client: User should explicit ack *each* message.
  conn_lifetime_sec      (Int, default:300 secs)
  subscription_id      (String, required for topic, Each subscription is identified by a unique Id,
                               for a topic different subscriptions means each subscription gets copy of
                               message each, same subscription_id across multiple Consumers means load-balancing
                               messages for that subscription.)
  enable_dynamic_serverlist_fetch:  (Boolean, Enable the consumer to fetch the list of brokers actively default: true)
  dynamic_serverlist_fetch_url_override      (String, The override url to fetch the list of consumers dynamically.)
  dynamic_fetch_timeout_ms  (Integer, milliseconds to wait for http response of dynamic serverlist fetch)
  receipt_wait_timeout_ms (Int, optoional value, default: 5 seconds)

Constant Summary

Constants inherited from Connection

Messagebus::Connection::STARTED, Messagebus::Connection::STOPPED

Instance Attribute Summary collapse

Attributes inherited from Connection

#host_params, #options

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Connection

#do_with_timeout, #start_server, #started?, #stop_server, #stopped?

Methods included from Validations

#valid_host?, #validate_connection_config, #validate_destination_config

Constructor Details

#initialize(host_params, options = {}) ⇒ Consumer

Returns a new instance of Consumer.



75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/messagebus/consumer.rb', line 75

def initialize(host_params, options = {})
  options = DottableHash.new({:ack_type => Messagebus::ACK_TYPE_CLIENT, :enable_dynamic_serverlist_fetch => true }).merge(options)
  options.merge!(options.cluster_defaults) if options.cluster_defaults

  super(host_params, options)

  validate_destination_config(@options.destination_name, true, options)
  validate_connection_config(@host_params, options)

  @received_messages = Queue.new
  @servers_running = {}
  @logger = Logger.new(options[:log_file]) if options[:log_file]
end

Instance Attribute Details

#received_messagesObject

Returns the value of attribute received_messages.



60
61
62
# File 'lib/messagebus/consumer.rb', line 60

def received_messages
  @received_messages
end

#servers_runningObject

Returns the value of attribute servers_running.



60
61
62
# File 'lib/messagebus/consumer.rb', line 60

def servers_running
  @servers_running
end

#stateObject

Returns the value of attribute state.



60
61
62
# File 'lib/messagebus/consumer.rb', line 60

def state
  @state
end

Class Method Details

.start(host_params, options = {}) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/messagebus/consumer.rb', line 62

def self.start(host_params, options={})
  consumer = new(host_params, options)
  consumer.start
  if block_given?
    begin
      yield consumer
    ensure
      consumer.stop
    end
  end
  consumer
end

Instance Method Details

#ack(safe_mode = false) ⇒ Object

Ack the last received message. Message broker will keep resending messages (after retry_wait and upto retry_max_times) till it sees an ack for the message.



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/messagebus/consumer.rb', line 170

def ack(safe_mode = false)
  if not @last_received.nil?
    begin
      logger.info("Sending ack() for message with id:#{@last_received[:decoded_msg].message_id}")
      if not safe_mode
        begin
          @servers_running[@last_received[:host_param]].acknowledge(@last_received[:msg])
          @last_received = nil
          return true
        
        rescue => e
          logger.error("Failed to ack message. Was the connection removed? #{e.message} #{e.backtrace.join("|")}")
        end
      else
        receipt_received = false
        errors_received = nil
        @servers_running[@last_received[:host_param]].acknowledge(@last_received[:msg]) do |msg|
          if msg.command == 'ERROR'
            errors_received = msg
            raise "Failed to ack message with Error: #{msg.body.to_s} #{caller}"
          else
            receipt_received = true
            @last_received = nil
          end
        end

        # wait for receipt up to given timeout.
        do_with_timeout(@options.receipt_wait_timeout_ms) do
          if errors_received
            raise "Failed to ack message in safe mode with Error: " + errors_received.body.to_s
          end

          if not receipt_received
            sleep 0.005
          else
            return true
          end
        end
      end
    end
  end
end

#creditObject

Send consumer credit back for last received message.



155
156
157
158
159
160
161
162
163
164
165
# File 'lib/messagebus/consumer.rb', line 155

def credit()
  if not @last_received.nil?
    begin
      logger.info("Sending consumer credit for message with id:#{@last_received[:decoded_msg].message_id}")

      @servers_running[@last_received[:host_param]].credit(@last_received[:msg])
    rescue NameError => e
      logger.error("Failed to credit message. Was the connection removed?. #{e.message} #{e.backtrace.join("|")}")
    end
  end
end

#delete_subscriptionObject



275
276
277
278
279
280
281
282
283
284
# File 'lib/messagebus/consumer.rb', line 275

def delete_subscription()
  host_params = @servers_running.keys
  if not host_params.nil?
    host_params.each do |host_param|
      logger.info("Unsubscribing #{@options.destination_name} consumer client for #{host_param}")
      client = @servers_running[host_param]
      client.unsubscribe(@options.destination_name)
    end
  end
end

#fetch_serverlistObject



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/messagebus/consumer.rb', line 238

def fetch_serverlist
  if @options.dynamic_serverlist_fetch_url_override
    dynamic_serverlist_fetch_url = @options.dynamic_serverlist_fetch_url_override
  else
    dynamic_serverlist_fetch_url = get_dynamic_fetch_url(@host_params)
  end

  logger.info("trying to fetch dynamic url #{dynamic_serverlist_fetch_url}")
  begin
    data = fetch_uri(dynamic_serverlist_fetch_url)
    data = data.gsub(' ', '')
    serverlist = data.split(',')
    serverlist.each do |server|
      if SERVER_REGEX.match(server).nil?
        raise "bad data returned from dynamic url: #{data}"
      end
    end
    return serverlist

  rescue => e
    logger.error("Failed to fetch server list from url:#{dynamic_serverlist_fetch_url} with exception: #{e.message}, #{e.backtrace.join("|")}")
    return nil
  end
end

#get_dynamic_fetch_url(host_params) ⇒ Object



263
264
265
266
267
268
269
270
271
272
273
# File 'lib/messagebus/consumer.rb', line 263

def get_dynamic_fetch_url(host_params)
  case host_params
  when Array
    host_param = host_params[rand(host_params.length)]
  when String
    host_param = host_params
  end

  host, port = host_param.split(':')
  return 'http://' + host + ':8081/jmx?command=get_attribute&args=org.hornetq%3Amodule%3DCore%2Ctype%3DServer%20ListOfBrokers'
end

#insert_sentinel_value(final_message = nil) ⇒ Object

This is used to insert an unblock message into the consumer. A use case is when you’re using a blocking receive, and you want to unblock a separate thread or tell a consumer to unblock from a signal handler. See also Messagebus::Swarm::Drone#stop

en.wikipedia.org/wiki/Sentinel_value



118
119
120
121
122
# File 'lib/messagebus/consumer.rb', line 118

def insert_sentinel_value(final_message=nil)
  # push a message onto our consumer so that if we're currently blocking on waiting for a message
  # we'll see this and do no further processing
  @received_messages.push({:stop_processing_sentinel => true, :msg => final_message})
end

#keepaliveObject



226
227
228
229
230
231
232
233
234
235
# File 'lib/messagebus/consumer.rb', line 226

def keepalive
  @servers_running.each do |host_param, client|
    begin
      client.keepalive()
      @last_received = nil
    rescue => e
      logger.error("Failed to send keepalive to #{host_param}")
    end
  end
end

#loggerObject



89
90
91
# File 'lib/messagebus/consumer.rb', line 89

def logger
  @logger ||= Client.logger
end

#nackObject



213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/messagebus/consumer.rb', line 213

def nack
  if not @last_received.nil?
    begin
      logger.info("Sending nack() for message with id:#{@last_received[:decoded_msg].message_id}")

      @servers_running[@last_received[:host_param]].nack(@last_received[:msg])
      @last_received = nil
    rescue => e
      logger.error("Failed to nack message. Was the connection removed? #{e.message} #{e.backtrace.join("|")}")
    end
  end
end

#receiveObject

Blocking receive: block till a value is available. Returns the message(Messagebus::Message) received or block indefinately.



126
127
128
# File 'lib/messagebus/consumer.rb', line 126

def receive
  return receive_internal(non_blocking=false)
end

#receive_immediateObject

Non-Blocking receive. Returns the message(Messagebus::Message) received or nil immediately.



146
147
148
149
150
151
152
# File 'lib/messagebus/consumer.rb', line 146

def receive_immediate()
  if not @received_messages.empty?
    return receive_internal(non_blocking=true)
  else
    return nil
  end
end

#receive_timeout(timeout_ms = 1000) ⇒ Object

Blocking receive with timeout: block till a value is available for passed timeout. Returns the message(Messagebus::Message) received or raise MessageReceiveTimeout(“timeout”)



132
133
134
135
136
137
138
139
140
141
142
# File 'lib/messagebus/consumer.rb', line 132

def receive_timeout(timeout_ms=1000)
  do_with_timeout(timeout_ms) {
    if @received_messages.empty?
      sleep 0.01
    else
      return receive_internal(non_blocking=true)
    end
  }

  raise MessageReceiveTimeout, "receive timeout(" + timeout_ms.to_s +  ") while waiting for message to arrive."
end

#refresh_serversObject



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/messagebus/consumer.rb', line 286

def refresh_servers
  logger.info("refreshing consumer threads.")
  @refresh_time = Time.new()
  hosts = @servers_running.keys
  if @options.enable_dynamic_serverlist_fetch
    # Fetch the server list from the dynamic server list fetch url.
    begin
      updated_server_list = fetch_serverlist
    rescue => e
      logger.error "Error in refresh server #{e} \n Stack Trace: #{e.backtrace.join("|")}"
    end

    if not updated_server_list.nil?
      hosts = updated_server_list
    end
  end

  logger.info("refreshing servers current_list:#{@servers_running.keys.inspect} new list:#{hosts.inspect}")

  servers_added = hosts - @servers_running.keys

  # start new servers
  if servers_added and not servers_added.empty?
    logger.info("Adding new servers in:#{servers_added.inspect}")
    if not servers_added.empty?()
      start_servers(servers_added)
    end
  end

end

#startObject

Start the consumers and all connections. Optionally takes a block to which it yields self. When the block is passed, it will auto close the connections after the block finishes.



96
97
98
99
100
101
# File 'lib/messagebus/consumer.rb', line 96

def start
  @state = STARTED
  logger.info("Starting consumers with host_params:#{@host_params.inspect} for destination:#{@options.destination_name}")
  start_servers(@host_params, true)
  refresh_servers
end

#stopObject

Close the consumers and all connections



104
105
106
107
108
109
# File 'lib/messagebus/consumer.rb', line 104

def stop
  @state = STOPPED
  logger.info("Stopping consumers for running servers:#{@servers_running.keys.inspect}")

  stop_servers(@servers_running.keys)
end