Class: StompServer::QueueManager
- Inherits:
-
Object
- Object
- StompServer::QueueManager
- Defined in:
- lib/stomp_server_ng/queue_manager.rb
Instance Method Summary collapse
-
#ack(connection, frame) ⇒ Object
Client ack.
-
#dequeue(dest, session_id) ⇒ Object
dequeue: remove a message from a queue.
-
#disconnect(connection) ⇒ Object
Client disconnect.
-
#enqueue(frame) ⇒ Object
enqueue: add a message to a queue.
-
#initialize(qstore) ⇒ QueueManager
constructor
Queue manager initialization.
-
#send_a_backlog(connection) ⇒ Object
send_a_backlog.
-
#send_destination_backlog(dest, user) ⇒ Object
send_destination_backlog.
-
#send_to_user(frame, user) ⇒ Object
send_to_user.
-
#sendmsg(frame) ⇒ Object
sendmsg.
-
#stop(session_id) ⇒ Object
Server stop / shutdown.
-
#subscribe(dest, connection, use_ack = false, subid = nil) ⇒ Object
Client subscribe for a destination.
-
#unsubscribe(dest, connection) ⇒ Object
Client unsubscribe.
Constructor Details
#initialize(qstore) ⇒ QueueManager
Queue manager initialization.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/stomp_server_ng/queue_manager.rb', line 43 def initialize(qstore) @@log = Logger.new(STDOUT) @@log.level = StompServer::LogHelper.get_loglevel() @@log.debug("QM QueueManager initialize comletes") # @qstore = qstore @queues = Hash.new { Array.new } @pending = Hash.new if $STOMP_SERVER monitor = StompServer::QueueMonitor.new(@qstore,@queues) monitor.start @@log.debug "QM monitor started by QM initialization" end end |
Instance Method Details
#ack(connection, frame) ⇒ Object
Client ack.
Called from the protocol handler (ack method).
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/stomp_server_ng/queue_manager.rb', line 200 def ack(connection, frame) @@log.debug "#{connection.session_id} QM ACK." @@log.debug "#{connection.session_id} QM ACK for frame: #{frame.inspect}" unless @pending[connection] @@log.debug "#{connection.session_id} QM No message pending for connection!" return end msgid = frame.headers['message-id'] p_msgid = @pending[connection].headers['message-id'] if p_msgid != msgid @@log.debug "#{connection.session_id} QM ACK Invalid message-id (received /#{msgid}/ != /#{p_msgid}/)" # We don't know what happened, we requeue # (probably a client connecting to a restarted server) frame = @pending[connection] @qstore.requeue(frame.headers['destination'],frame) end @pending.delete connection # We are free to work now, look if there's something for us send_a_backlog(connection) end |
#dequeue(dest, session_id) ⇒ Object
dequeue: remove a message from a queue.
289 290 291 |
# File 'lib/stomp_server_ng/queue_manager.rb', line 289 def dequeue(dest, session_id) @qstore.dequeue(dest, session_id) end |
#disconnect(connection) ⇒ Object
Client disconnect.
Called from the protocol handler (unbind method).
225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/stomp_server_ng/queue_manager.rb', line 225 def disconnect(connection) @@log.debug("#{connection.session_id} QM DISCONNECT.") frame = @pending[connection] @@log.debug("#{connection.session_id} QM DISCONNECT pending frame: #{frame.inspect}") if frame @qstore.requeue(frame.headers['destination'],frame) @pending.delete connection end # @queues.each do |dest, queue| queue.delete_if { |qu| qu.connection == connection } @queues.delete(dest) if queue.empty? end end |
#enqueue(frame) ⇒ Object
enqueue: add a message to a queue.
295 296 297 298 299 |
# File 'lib/stomp_server_ng/queue_manager.rb', line 295 def enqueue(frame) frame.command = "MESSAGE" dest = frame.headers['destination'] @qstore.enqueue(dest,frame) end |
#send_a_backlog(connection) ⇒ Object
send_a_backlog
Send at most one frame to a connection. Used when use_ack == true. Called from the ack method.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/stomp_server_ng/queue_manager.rb', line 81 def send_a_backlog(connection) @@log.debug "#{connection.session_id} QM send_a_backlog starts" # # lookup queues with data for this connection # # :stopdoc: # 1.9 compatability # # The Hash#select method returns: # # * An Array (of Arrays) in Ruby 1.8 # * A Hash in Ruby 1.9 # # Watch the code in this method. It is a bit ugly because of that # difference. # :startdoc: possible_queues = @queues.select{ |destination, users| @qstore.(destination, connection.session_id) && users.detect{|u| u.connection == connection} } if possible_queues.empty? @@log.debug "#{connection.session_id} QM s_a_b nothing to send" return end # # Get a random one (avoid artificial priority between queues # without coding a whole scheduler, which might be desirable later) # # Select a random destination from those possible # :stopdoc: # Told ya' this would get ugly. A quote from the Pickaxe. I am: # # 'abandoning the benefits of polymorphism, and bringing the gods of refactoring down around my ears' # # :-) # :startdoc: # The following log call results in an exception using 1.9.2p180. I cannot # recreate this using IRB. It has something to do with 'Struct's I think. # @@log.debug("#{connection.session_id} possible_queues: #{possible_queues.inspect}") case possible_queues when Hash # possible_queues _is_ a Hash dests_possible = possible_queues.keys # Get keys of a Hash of destination / queues dest_index = rand(dests_possible.size) # Random index dest = dests_possible[dest_index] # Select a destination / queue # The selected destination has (possibly) multiple users. # Select a random user from those possible user_index = rand(possible_queues[dest].size) # Random index user = possible_queues[dest][user_index] # Array entry from Hash table entry # when Array # possible_queues _is_ an Array dest_index = rand(possible_queues.size) # Random index dest_data = possible_queues[dest_index] # Select a destination + user array dest = dest_data[0] # Select a destination / queue # The selected destination has (possibly) multiple users. # Select a random user from those possible user_index = rand(dest_data[1].size) # Random index user = dest_data[1][user_index] # Array entry from Hash table entry else raise "#{connection.session_id} something is very not right : #{RUBY_VERSION}" end # @@log.debug "#{connection.session_id} QM s_a_b chosen -> dest: #{dest}" # Ditto for this log statement using 1.9.2p180. # @@log.debug "#{connection.session_id} QM s_a_b chosen -> user: #{user}" # frame = @qstore.dequeue(dest, connection.session_id) send_to_user(frame, user) end |
#send_destination_backlog(dest, user) ⇒ Object
send_destination_backlog
Called from the subscribe method.
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/stomp_server_ng/queue_manager.rb', line 167 def send_destination_backlog(dest,user) @@log.debug "#{user.connection.session_id} QM send_destination_backlog for #{dest}" if user.ack # Only send one message, then wait for client ACK. frame = @qstore.dequeue(dest, user.connection.session_id) if frame send_to_user(frame, user) @@log.debug("#{user.connection.session_id} QM s_d_b single frame sent") end else # Send all available messages. while frame = @qstore.dequeue(dest, user.connection.session_id) send_to_user(frame, user) end end end |
#send_to_user(frame, user) ⇒ Object
send_to_user
242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/stomp_server_ng/queue_manager.rb', line 242 def send_to_user(frame, user) @@log.debug("#{user.connection.session_id} QM send_to_user") connection = user.connection frame.headers['subscription'] = user.subid if user.subid if user.ack # raise on internal logic error. raise "#{user.connection.session_id} other connection's end already busy" if @pending[connection] # A maximum of one frame can be pending ACK. @pending[connection] = frame end connection.stomp_send_data(frame) end |
#sendmsg(frame) ⇒ Object
sendmsg
Called from the protocol handler (sendmsg method, process_frame method).
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 |
# File 'lib/stomp_server_ng/queue_manager.rb', line 259 def sendmsg(frame) # @@log.debug("#{frame.headers['session']} QM client SEND Processing, #{frame}") frame.command = "MESSAGE" dest = frame.headers['destination'] # Lookup a user willing to handle this destination available_users = @queues[dest].reject{|user| @pending[user.connection]} if available_users.empty? @qstore.enqueue(dest,frame) return end # # Look for a user with ack (we favor reliability) # reliable_user = available_users.find{|u| u.ack} # if reliable_user # give it a message-id @qstore.assign_id(frame, dest) send_to_user(frame, reliable_user) else random_user = available_users[rand(available_users.length)] # Note message-id header isn't set but we won't need it anyway # <TODO> could break some clients: fix this send_to_user(frame, random_user) end end |
#stop(session_id) ⇒ Object
Server stop / shutdown.
60 61 62 |
# File 'lib/stomp_server_ng/queue_manager.rb', line 60 def stop(session_id) @qstore.stop(session_id) if (@qstore.methods.include?('stop') || @qstore.methods.include?(:stop)) end |
#subscribe(dest, connection, use_ack = false, subid = nil) ⇒ Object
Client subscribe for a destination.
Called from the protocol handler (subscribe method).
68 69 70 71 72 73 |
# File 'lib/stomp_server_ng/queue_manager.rb', line 68 def subscribe(dest, connection, use_ack=false, subid = nil) @@log.debug "#{connection.session_id} QM subscribe to #{dest}, ack => #{use_ack}, connection: #{connection}, subid: #{subid}" user = Struct::QueueUser.new(connection, use_ack, subid) @queues[dest] += [user] send_destination_backlog(dest,user) unless dest == '/queue/monitor' end |
#unsubscribe(dest, connection) ⇒ Object
Client unsubscribe.
Called from the protocol handler (unsubscribe method).
188 189 190 191 192 193 194 |
# File 'lib/stomp_server_ng/queue_manager.rb', line 188 def unsubscribe(dest, connection) @@log.debug "#{connection.session_id} QM unsubscribe from #{dest}, connection #{connection}" @queues.each do |d, queue| queue.delete_if { |qu| qu.connection == connection and d == dest} end @queues.delete(dest) if @queues[dest].empty? end |