Class: StompServer::MemoryQueue
- Inherits:
-
Object
- Object
- StompServer::MemoryQueue
- Defined in:
- lib/stomp_server_ng/queue/memory_queue.rb
Instance Attribute Summary collapse
-
#checkpoint_interval ⇒ Object
Returns the value of attribute checkpoint_interval.
Instance Method Summary collapse
- #assign_id(frame, dest) ⇒ Object
- #dequeue(dest, session_id) ⇒ Object
- #enqueue(dest, frame) ⇒ Object
-
#initialize ⇒ MemoryQueue
constructor
A new instance of MemoryQueue.
- #message_for?(dest, session_id) ⇒ Boolean
- #monitor ⇒ Object
- #requeue(dest, frame) ⇒ Object
- #stop(session_id) ⇒ Object
Constructor Details
#initialize ⇒ MemoryQueue
Returns a new instance of MemoryQueue.
6 7 8 9 10 11 12 13 14 15 16 |
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 6 def initialize @@log = Logger.new(STDOUT) @@log.level = StompServer::LogHelper.get_loglevel() @frame_index =0 @stompid = StompServer::StompId.new @stats = Hash.new @messages = Hash.new { Array.new } @@log.debug "MemoryQueue initialized" end |
Instance Attribute Details
#checkpoint_interval ⇒ Object
Returns the value of attribute checkpoint_interval.
4 5 6 |
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 4 def checkpoint_interval @checkpoint_interval end |
Instance Method Details
#assign_id(frame, dest) ⇒ Object
60 61 62 |
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 60 def assign_id(frame, dest) frame.headers['message-id'] = @stompid[@frame_index] end |
#dequeue(dest, session_id) ⇒ Object
30 31 32 33 34 35 36 37 |
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 30 def dequeue(dest, session_id) if frame = @messages[dest].shift @stats[dest][:dequeued] += 1 return frame else return false end end |
#enqueue(dest, frame) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 39 def enqueue(dest,frame) @frame_index += 1 if @stats[dest] @stats[dest][:enqueued] += 1 else @stats[dest] = Hash.new @stats[dest][:enqueued] = 1 @stats[dest][:dequeued] = 0 end assign_id(frame, dest) requeue(dest, frame) end |
#message_for?(dest, session_id) ⇒ Boolean
56 57 58 |
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 56 def (dest, session_id) !@messages[dest].empty? end |
#monitor ⇒ Object
22 23 24 25 26 27 28 |
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 22 def monitor stats = Hash.new @messages.keys.each do |dest| stats[dest] = {'size' => @messages[dest].size, 'enqueued' => @stats[dest][:enqueued], 'dequeued' => @stats[dest][:dequeued]} end stats end |
#requeue(dest, frame) ⇒ Object
52 53 54 |
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 52 def requeue(dest,frame) @messages[dest] += [frame] end |
#stop(session_id) ⇒ Object
18 19 20 |
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 18 def stop(session_id) @@log.debug("#{session_id} memory queue shutdown") end |