Class: StompServer::Queue
- Inherits:
-
Object
- Object
- StompServer::Queue
- Defined in:
- lib/stomp_server_ng/queue.rb
Overview
Queue
Instance Attribute Summary collapse
-
#checkpoint_interval ⇒ Object
the check point interval.
Instance Method Summary collapse
-
#assign_id(frame, dest) ⇒ Object
assign_id.
-
#close_queue(dest, session_id) ⇒ Object
close_queue.
-
#dequeue(dest, session_id) ⇒ Object
dequeue.
-
#enqueue(dest, frame) ⇒ Object
enqueue.
-
#initialize(directory = '.stompserver', delete_empty = true) ⇒ Queue
constructor
initiialize.
-
#message_for?(dest, session_id) ⇒ Boolean
messsage_for?.
-
#monitor ⇒ Object
monitor.
-
#open_queue(dest, session_id) ⇒ Object
open_queue.
-
#readframe(dest, msgid, session_id) ⇒ Object
readframe.
-
#requeue(dest, frame) ⇒ Object
requeue.
-
#save_queue_state(session_id) ⇒ Object
save_queue_state.
-
#stop(session_id) ⇒ Object
stop.
-
#writeframe(dest, frame, msgid) ⇒ Object
writeframe.
Constructor Details
#initialize(directory = '.stompserver', delete_empty = true) ⇒ Queue
initiialize
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/stomp_server_ng/queue.rb', line 13 def initialize(directory='.stompserver', delete_empty=true) @@log = Logger.new(STDOUT) @@log.level = StompServer::LogHelper.get_loglevel() @@log.debug("Q #{self} initialization starts") @stompid = StompServer::StompId.new @delete_empty = delete_empty @directory = directory Dir.mkdir(@directory) unless File.directory?(@directory) if File.exists?("#{@directory}/qinfo") qinfo = Hash.new File.open("#{@directory}/qinfo", "rb") { |f| qinfo = Marshal.load(f.read)} @queues = qinfo[:queues] @frames = qinfo[:frames] else @queues = Hash.new @frames = Hash.new end @queues.keys.each do |dest| @@log.debug "Q #{self} dest=#{dest} size=#{@queues[dest][:size]} enqueued=#{@queues[dest][:enqueued]} dequeued=#{@queues[dest][:dequeued]}" end @@log.debug("Q #{self} initialized in #{@directory}") # # Cleanup dead queues and save the state of the queues every so often. # Alternatively we could save the queue state every X number # of frames that are put in the queue. # Should probably also read it after saving it to confirm integrity. # # Removed: this badly corrupts the queue when stopping with messages # # EventMachine::add_periodic_timer 1800, proc {@queues.keys.each # {|dest| close_queue(dest)};save_queue_state } # end |
Instance Attribute Details
#checkpoint_interval ⇒ Object
the check point interval
10 11 12 |
# File 'lib/stomp_server_ng/queue.rb', line 10 def checkpoint_interval @checkpoint_interval end |
Instance Method Details
#assign_id(frame, dest) ⇒ Object
assign_id
229 230 231 232 233 |
# File 'lib/stomp_server_ng/queue.rb', line 229 def assign_id(frame, dest) @@log.debug "#{frame.headers['session']} assign_id, frame: #{frame}, dest: #{dest}" msg_id = @queues[dest].nil? ? 1 : @queues[dest][:msgid] frame.headers['message-id'] = @stompid[msg_id] end |
#close_queue(dest, session_id) ⇒ Object
close_queue
94 95 96 97 98 99 100 101 102 |
# File 'lib/stomp_server_ng/queue.rb', line 94 def close_queue(dest, session_id) @@log.debug "#{session_id} close_queue" if @queues[dest][:size] == 0 and @queues[dest][:frames].size == 0 and @delete_empty _close_queue(dest) @queues.delete(dest) @frames.delete(dest) @@log.debug "#{session_id} Queue #{dest} removed." end end |
#dequeue(dest, session_id) ⇒ Object
dequeue
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/stomp_server_ng/queue.rb', line 185 def dequeue(dest, session_id) @@log.debug "#{session_id} dequeue, dest: #{dest}" return false unless (dest, session_id) # update queues ... dest .... :frames here msgid = @queues[dest][:frames].shift frame = readframe(dest,msgid,session_id) @@log.debug("#{frame.headers['session']} Dequeue for message: #{msgid} Client: #{frame.headers['client-id'] if frame.headers['client-id']}") # update queues (queues[dest]) # :size, :frames, :msgid, :enqueued, :dequeued, :exceptions @queues[dest][:size] -= 1 # :frames - see above @queues[dest][:msgid] -= 1 # :enqueued - no change @queues[dest][:dequeued] += 1 # :exceptions - no change @queues[dest].delete(msgid) close_queue(dest, frame.headers['session']) save_queue_state(frame.headers['session']) return frame end |
#enqueue(dest, frame) ⇒ Object
enqueue
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/stomp_server_ng/queue.rb', line 161 def enqueue(dest,frame) @@log.debug "#{frame.headers['session']} enqueue" open_queue(dest, frame.headers['session']) unless @queues.has_key?(dest) msgid = assign_id(frame, dest) @@log.debug("#{frame.headers['session']} Enqueue for message: #{msgid} Client: #{frame.headers['client-id'] if frame.headers['client-id']}") writeframe(dest,frame,msgid) # update queues (queues[dest]) # :size, :frames, :msgid, :enqueued, :dequeued, :exceptions @queues[dest][:size] += 1 @queues[dest][:frames].push(msgid) @queues[dest][:msgid] += 1 @queues[dest][:enqueued] += 1 # no :dequeue here # no :exceptions here # Update frames # Initialize frames entry for this: dest, frame, and msgid new_frames_entry(dest, frame, msgid) save_queue_state(frame.headers['session']) return true end |
#message_for?(dest, session_id) ⇒ Boolean
messsage_for?
210 211 212 213 214 |
# File 'lib/stomp_server_ng/queue.rb', line 210 def (dest, session_id) retval = (@queues.has_key?(dest) and (!@queues[dest][:frames].empty?)) @@log.debug "#{session_id} message_for?, dest: #{dest}, #{retval}" return retval end |
#monitor ⇒ Object
monitor
79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/stomp_server_ng/queue.rb', line 79 def monitor @@log.debug "#{self} monitor" stats = Hash.new @queues.keys.each do |dest| stats[dest] = { 'size' => @queues[dest][:size], 'enqueued' => @queues[dest][:enqueued], 'dequeued' => @queues[dest][:dequeued], 'exceptions' => @queues[dest][:exceptions], } end stats end |
#open_queue(dest, session_id) ⇒ Object
open_queue
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/stomp_server_ng/queue.rb', line 105 def open_queue(dest, session_id) @@log.debug "#{session_id} open_queue" # New queue @queues[dest] = Hash.new # New frames for this queue @frames[dest] = Hash.new # Update queues # :size, :frames, :msgid, :enqueued, :dequeued, :exceptions @queues[dest][:size] = 0 @queues[dest][:frames] = Array.new @queues[dest][:msgid] = 1 @queues[dest][:enqueued] = 0 @queues[dest][:dequeued] = 0 @queues[dest][:exceptions] = 0 _open_queue(dest) @@log.debug "Created queue #{dest}" end |
#readframe(dest, msgid, session_id) ⇒ Object
readframe
223 224 225 226 |
# File 'lib/stomp_server_ng/queue.rb', line 223 def readframe(dest,msgid, session_id) @@log.debug "#{session_id} readframe, dest: #{dest}, msgid: #{msgid}" _readframe(dest,msgid) end |
#requeue(dest, frame) ⇒ Object
requeue
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/stomp_server_ng/queue.rb', line 124 def requeue(dest,frame) @@log.debug "#{frame.headers['session']} requeue, for #{dest}, frame: #{frame.inspect}" open_queue(dest, frame.headers['session']) unless @queues.has_key?(dest) msgid = frame.headers['message-id'] # # Note: frame.headers['max-exceptions'] is currently _never_ set any where! # if frame.headers['max-exceptions'] and @frames[dest][msgid][:exceptions] >= frame.headers['max-exceptions'].to_i enqueue("/queue/deadletter",frame) return end # writeframe(dest,frame,msgid) # update queues (queues[dest]) # :size, :frames, :msgid, :enqueued, :dequeued, :exceptions @queues[dest][:size] += 1 @queues[dest][:frames].unshift(msgid) # no :msgid here # no :enqueued here # no :dequeued here @queues[dest][:exceptions] += 1 # update frames # # Is this _always_ the case in this method ????? unless @frames[dest][msgid] new_frames_entry(dest, frame, msgid) end # @frames[dest][msgid][:exceptions] += 1 @frames[dest][msgid][:requeued] += 1 save_queue_state(frame.headers['session']) return true end |
#save_queue_state(session_id) ⇒ Object
save_queue_state
64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/stomp_server_ng/queue.rb', line 64 def save_queue_state(session_id) @@log.debug "#{session_id} save_queue_state" now=Time.now @next_save ||=now if now >= @next_save @@log.debug "#{session_id} saving state" qinfo = {:queues => @queues, :frames => @frames} # write then rename to make sure this is atomic File.open("#{@directory}/qinfo.new", "wb") { |f| f.write Marshal.dump(qinfo)} File.rename("#{@directory}/qinfo.new","#{@directory}/qinfo") @next_save=now+checkpoint_interval end end |
#stop(session_id) ⇒ Object
stop
53 54 55 56 57 58 59 60 61 |
# File 'lib/stomp_server_ng/queue.rb', line 53 def stop(session_id) @@log.debug "#{session_id} Shutting down Queues, queue count: #{@queues.size}" # @queues.keys.each do |dest| @@log.debug "#{session_id}: Queue #{dest}: size=#{@queues[dest][:size]} enqueued=#{@queues[dest][:enqueued]} dequeued=#{@queues[dest][:dequeued]}" close_queue(dest, session_id) end save_queue_state(session_id) end |
#writeframe(dest, frame, msgid) ⇒ Object
writeframe
217 218 219 220 |
# File 'lib/stomp_server_ng/queue.rb', line 217 def writeframe(dest,frame,msgid) @@log.debug "#{frame.headers['session']} writeframe, dest: #{dest}, frame: #{frame}, msgid: #{msgid}" _writeframe(dest,frame,msgid) end |