Class: Messagebus::Consumer
- Inherits:
-
Connection
- Object
- Connection
- Messagebus::Consumer
- Defined in:
- lib/messagebus/consumer.rb
Overview
Consumer client class. Provides a single access thread for all messagebus servers. Takes in a list of messagebus servers to receive from, open connections to all servers, does round robin receives across all servers.
parameters:
dest (String, required value, name of the queue/topic)
host_params (list<string>, required value, eg. '[localhost:61613]')
options : A hash map for optional values.
user (String, default : '')
passwd (String, default : '')
ack_type (String, required value: Messagebus::ACK_TYPE_AUTO_CLIENT OR Messagebus::ACK_TYPE_CLIENT)
autoClient: module acks internally automatically for each receive.
client: User should explicit ack *each* message.
conn_lifetime_sec (Int, default:300 secs)
subscription_id (String, required for topic, Each subscription is identified by a unique Id,
for a topic different subscriptions means each subscription gets copy of
message each, same subscription_id across multiple Consumers means load-balancing
messages for that subscription.)
enable_dynamic_serverlist_fetch: (Boolean, Enable the consumer to fetch the list of brokers actively default: true)
dynamic_serverlist_fetch_url_override (String, The override url to fetch the list of consumers dynamically.)
dynamic_fetch_timeout_ms (Integer, milliseconds to wait for http response of dynamic serverlist fetch)
receipt_wait_timeout_ms (Int, optoional value, default: 5 seconds)
Constant Summary
Constants inherited from Connection
Messagebus::Connection::STARTED, Messagebus::Connection::STOPPED
Instance Attribute Summary collapse
-
#received_messages ⇒ Object
Returns the value of attribute received_messages.
-
#servers_running ⇒ Object
Returns the value of attribute servers_running.
-
#state ⇒ Object
Returns the value of attribute state.
Attributes inherited from Connection
Class Method Summary collapse
Instance Method Summary collapse
-
#ack(safe_mode = false) ⇒ Object
Ack the last received message.
-
#credit ⇒ Object
Send consumer credit back for last received message.
- #delete_subscription ⇒ Object
- #fetch_serverlist ⇒ Object
- #get_dynamic_fetch_url(host_params) ⇒ Object
-
#initialize(host_params, options = {}) ⇒ Consumer
constructor
A new instance of Consumer.
-
#insert_sentinel_value(final_message = nil) ⇒ Object
This is used to insert an unblock message into the consumer.
- #keepalive ⇒ Object
- #logger ⇒ Object
- #nack ⇒ Object
-
#receive ⇒ Object
Blocking receive: block till a value is available.
-
#receive_immediate ⇒ Object
Non-Blocking receive.
-
#receive_timeout(timeout_ms = 1000) ⇒ Object
Blocking receive with timeout: block till a value is available for passed timeout.
- #refresh_servers ⇒ Object
-
#start ⇒ Object
Start the consumers and all connections.
-
#stop ⇒ Object
Close the consumers and all connections.
Methods inherited from Connection
#do_with_timeout, #start_server, #started?, #stop_server, #stopped?
Methods included from Validations
#valid_host?, #validate_connection_config, #validate_destination_config
Constructor Details
#initialize(host_params, options = {}) ⇒ Consumer
Returns a new instance of Consumer.
75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/messagebus/consumer.rb', line 75 def initialize(host_params, = {}) = DottableHash.new({:ack_type => Messagebus::ACK_TYPE_CLIENT, :enable_dynamic_serverlist_fetch => true }).merge() .merge!(.cluster_defaults) if .cluster_defaults super(host_params, ) validate_destination_config(@options.destination_name, true, ) validate_connection_config(@host_params, ) @received_messages = Queue.new @servers_running = {} @logger = Logger.new([:log_file]) if [:log_file] end |
Instance Attribute Details
#received_messages ⇒ Object
Returns the value of attribute received_messages.
60 61 62 |
# File 'lib/messagebus/consumer.rb', line 60 def @received_messages end |
#servers_running ⇒ Object
Returns the value of attribute servers_running.
60 61 62 |
# File 'lib/messagebus/consumer.rb', line 60 def servers_running @servers_running end |
#state ⇒ Object
Returns the value of attribute state.
60 61 62 |
# File 'lib/messagebus/consumer.rb', line 60 def state @state end |
Class Method Details
.start(host_params, options = {}) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/messagebus/consumer.rb', line 62 def self.start(host_params, ={}) consumer = new(host_params, ) consumer.start if block_given? begin yield consumer ensure consumer.stop end end consumer end |
Instance Method Details
#ack(safe_mode = false) ⇒ Object
Ack the last received message. Message broker will keep resending messages (after retry_wait and upto retry_max_times) till it sees an ack for the message.
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/messagebus/consumer.rb', line 170 def ack(safe_mode = false) if not @last_received.nil? begin logger.info("Sending ack() for message with id:#{@last_received[:decoded_msg].}") if not safe_mode begin @servers_running[@last_received[:host_param]].acknowledge(@last_received[:msg]) @last_received = nil return true rescue => e logger.error("Failed to ack message. Was the connection removed? #{e.} #{e.backtrace.join("|")}") end else receipt_received = false errors_received = nil @servers_running[@last_received[:host_param]].acknowledge(@last_received[:msg]) do |msg| if msg.command == 'ERROR' errors_received = msg raise "Failed to ack message with Error: #{msg.body.to_s} #{caller}" else receipt_received = true @last_received = nil end end # wait for receipt up to given timeout. do_with_timeout(@options.receipt_wait_timeout_ms) do if errors_received raise "Failed to ack message in safe mode with Error: " + errors_received.body.to_s end if not receipt_received sleep 0.005 else return true end end end end end end |
#credit ⇒ Object
Send consumer credit back for last received message.
155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/messagebus/consumer.rb', line 155 def credit() if not @last_received.nil? begin logger.info("Sending consumer credit for message with id:#{@last_received[:decoded_msg].}") @servers_running[@last_received[:host_param]].credit(@last_received[:msg]) rescue NameError => e logger.error("Failed to credit message. Was the connection removed?. #{e.} #{e.backtrace.join("|")}") end end end |
#delete_subscription ⇒ Object
275 276 277 278 279 280 281 282 283 284 |
# File 'lib/messagebus/consumer.rb', line 275 def delete_subscription() host_params = @servers_running.keys if not host_params.nil? host_params.each do |host_param| logger.info("Unsubscribing #{@options.destination_name} consumer client for #{host_param}") client = @servers_running[host_param] client.unsubscribe(@options.destination_name) end end end |
#fetch_serverlist ⇒ Object
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/messagebus/consumer.rb', line 238 def fetch_serverlist if @options.dynamic_serverlist_fetch_url_override dynamic_serverlist_fetch_url = @options.dynamic_serverlist_fetch_url_override else dynamic_serverlist_fetch_url = get_dynamic_fetch_url(@host_params) end logger.info("trying to fetch dynamic url #{dynamic_serverlist_fetch_url}") begin data = fetch_uri(dynamic_serverlist_fetch_url) data = data.gsub(' ', '') serverlist = data.split(',') serverlist.each do |server| if SERVER_REGEX.match(server).nil? raise "bad data returned from dynamic url: #{data}" end end return serverlist rescue => e logger.error("Failed to fetch server list from url:#{dynamic_serverlist_fetch_url} with exception: #{e.}, #{e.backtrace.join("|")}") return nil end end |
#get_dynamic_fetch_url(host_params) ⇒ Object
263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/messagebus/consumer.rb', line 263 def get_dynamic_fetch_url(host_params) case host_params when Array host_param = host_params[rand(host_params.length)] when String host_param = host_params end host, port = host_param.split(':') return 'http://' + host + ':8081/jmx?command=get_attribute&args=org.hornetq%3Amodule%3DCore%2Ctype%3DServer%20ListOfBrokers' end |
#insert_sentinel_value(final_message = nil) ⇒ Object
This is used to insert an unblock message into the consumer. A use case is when you’re using a blocking receive, and you want to unblock a separate thread or tell a consumer to unblock from a signal handler. See also Messagebus::Swarm::Drone#stop
118 119 120 121 122 |
# File 'lib/messagebus/consumer.rb', line 118 def insert_sentinel_value(=nil) # push a message onto our consumer so that if we're currently blocking on waiting for a message # we'll see this and do no further processing @received_messages.push({:stop_processing_sentinel => true, :msg => }) end |
#keepalive ⇒ Object
226 227 228 229 230 231 232 233 234 235 |
# File 'lib/messagebus/consumer.rb', line 226 def keepalive @servers_running.each do |host_param, client| begin client.keepalive() @last_received = nil rescue => e logger.error("Failed to send keepalive to #{host_param}") end end end |
#logger ⇒ Object
89 90 91 |
# File 'lib/messagebus/consumer.rb', line 89 def logger @logger ||= Client.logger end |
#nack ⇒ Object
213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/messagebus/consumer.rb', line 213 def nack if not @last_received.nil? begin logger.info("Sending nack() for message with id:#{@last_received[:decoded_msg].}") @servers_running[@last_received[:host_param]].nack(@last_received[:msg]) @last_received = nil rescue => e logger.error("Failed to nack message. Was the connection removed? #{e.} #{e.backtrace.join("|")}") end end end |
#receive ⇒ Object
Blocking receive: block till a value is available. Returns the message(Messagebus::Message) received or block indefinately.
126 127 128 |
# File 'lib/messagebus/consumer.rb', line 126 def receive return receive_internal(non_blocking=false) end |
#receive_immediate ⇒ Object
Non-Blocking receive. Returns the message(Messagebus::Message) received or nil immediately.
146 147 148 149 150 151 152 |
# File 'lib/messagebus/consumer.rb', line 146 def receive_immediate() if not @received_messages.empty? return receive_internal(non_blocking=true) else return nil end end |
#receive_timeout(timeout_ms = 1000) ⇒ Object
Blocking receive with timeout: block till a value is available for passed timeout. Returns the message(Messagebus::Message) received or raise MessageReceiveTimeout(“timeout”)
132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/messagebus/consumer.rb', line 132 def receive_timeout(timeout_ms=1000) do_with_timeout(timeout_ms) { if @received_messages.empty? sleep 0.01 else return receive_internal(non_blocking=true) end } raise MessageReceiveTimeout, "receive timeout(" + timeout_ms.to_s + ") while waiting for message to arrive." end |
#refresh_servers ⇒ Object
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/messagebus/consumer.rb', line 286 def refresh_servers logger.info("refreshing consumer threads.") @refresh_time = Time.new() hosts = @servers_running.keys if @options.enable_dynamic_serverlist_fetch # Fetch the server list from the dynamic server list fetch url. begin updated_server_list = fetch_serverlist rescue => e logger.error "Error in refresh server #{e} \n Stack Trace: #{e.backtrace.join("|")}" end if not updated_server_list.nil? hosts = updated_server_list end end logger.info("refreshing servers current_list:#{@servers_running.keys.inspect} new list:#{hosts.inspect}") servers_added = hosts - @servers_running.keys # start new servers if servers_added and not servers_added.empty? logger.info("Adding new servers in:#{servers_added.inspect}") if not servers_added.empty?() start_servers(servers_added) end end end |
#start ⇒ Object
Start the consumers and all connections. Optionally takes a block to which it yields self. When the block is passed, it will auto close the connections after the block finishes.
96 97 98 99 100 101 |
# File 'lib/messagebus/consumer.rb', line 96 def start @state = STARTED logger.info("Starting consumers with host_params:#{@host_params.inspect} for destination:#{@options.destination_name}") start_servers(@host_params, true) refresh_servers end |
#stop ⇒ Object
Close the consumers and all connections
104 105 106 107 108 109 |
# File 'lib/messagebus/consumer.rb', line 104 def stop @state = STOPPED logger.info("Stopping consumers for running servers:#{@servers_running.keys.inspect}") stop_servers(@servers_running.keys) end |