Class: StompServer::MemoryQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/stomp_server_ng/queue/memory_queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMemoryQueue

Returns a new instance of MemoryQueue.



6
7
8
9
10
11
12
13
14
15
16
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 6

def initialize

  @@log = Logger.new(STDOUT)
  @@log.level = StompServer::LogHelper.get_loglevel()

  @frame_index =0
  @stompid = StompServer::StompId.new
  @stats = Hash.new
  @messages = Hash.new { Array.new }
  @@log.debug "MemoryQueue initialized"
end

Instance Attribute Details

#checkpoint_intervalObject

Returns the value of attribute checkpoint_interval.



4
5
6
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 4

def checkpoint_interval
  @checkpoint_interval
end

Instance Method Details

#assign_id(frame, dest) ⇒ Object



60
61
62
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 60

def assign_id(frame, dest)
  frame.headers['message-id'] = @stompid[@frame_index]
end

#dequeue(dest, session_id) ⇒ Object



30
31
32
33
34
35
36
37
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 30

def dequeue(dest, session_id)
  if frame = @messages[dest].shift
    @stats[dest][:dequeued] += 1
    return frame
  else
    return false
  end
end

#enqueue(dest, frame) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 39

def enqueue(dest,frame)
  @frame_index += 1
  if @stats[dest]
    @stats[dest][:enqueued] += 1
  else
    @stats[dest] = Hash.new
    @stats[dest][:enqueued] = 1
    @stats[dest][:dequeued] = 0
  end
  assign_id(frame, dest)
  requeue(dest, frame)
end

#message_for?(dest, session_id) ⇒ Boolean

Returns:

  • (Boolean)


56
57
58
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 56

def message_for?(dest, session_id)
  !@messages[dest].empty?
end

#monitorObject



22
23
24
25
26
27
28
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 22

def monitor
  stats = Hash.new
  @messages.keys.each do |dest|
   stats[dest] = {'size' => @messages[dest].size, 'enqueued' => @stats[dest][:enqueued], 'dequeued' => @stats[dest][:dequeued]}
  end
  stats
end

#requeue(dest, frame) ⇒ Object



52
53
54
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 52

def requeue(dest,frame)
  @messages[dest] += [frame]
end

#stop(session_id) ⇒ Object



18
19
20
# File 'lib/stomp_server_ng/queue/memory_queue.rb', line 18

def stop(session_id)
  @@log.debug("#{session_id} memory queue shutdown")
end