Class: StompServer::QueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/stomp_server_ng/queue_manager.rb

Instance Method Summary collapse

Constructor Details

#initialize(qstore) ⇒ QueueManager

Queue manager initialization.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/stomp_server_ng/queue_manager.rb', line 43

def initialize(qstore)
  @@log = Logger.new(STDOUT)
  @@log.level = StompServer::LogHelper.get_loglevel()
  @@log.debug("QM QueueManager initialize comletes")
  #
  @qstore = qstore
  @queues = Hash.new { Array.new }
  @pending = Hash.new
  if $STOMP_SERVER
    monitor = StompServer::QueueMonitor.new(@qstore,@queues)
    monitor.start
    @@log.debug "QM monitor started by QM initialization"
  end
end

Instance Method Details

#ack(connection, frame) ⇒ Object

Client ack.

Called from the protocol handler (ack method).



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/stomp_server_ng/queue_manager.rb', line 200

def ack(connection, frame)
  @@log.debug "#{connection.session_id} QM ACK."
  @@log.debug "#{connection.session_id} QM ACK for frame: #{frame.inspect}"
  unless @pending[connection]
    @@log.debug "#{connection.session_id} QM No message pending for connection!"
    return
  end
  msgid = frame.headers['message-id']
  p_msgid = @pending[connection].headers['message-id']
  if p_msgid != msgid
    @@log.debug "#{connection.session_id} QM ACK Invalid message-id (received /#{msgid}/ != /#{p_msgid}/)"
    # We don't know what happened, we requeue
    # (probably a client connecting to a restarted server)
    frame = @pending[connection]
    @qstore.requeue(frame.headers['destination'],frame)
  end
  @pending.delete connection
  # We are free to work now, look if there's something for us
  send_a_backlog(connection)
end

#dequeue(dest, session_id) ⇒ Object

dequeue: remove a message from a queue.



289
290
291
# File 'lib/stomp_server_ng/queue_manager.rb', line 289

def dequeue(dest, session_id)
  @qstore.dequeue(dest, session_id)
end

#disconnect(connection) ⇒ Object

Client disconnect.

Called from the protocol handler (unbind method).



225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/stomp_server_ng/queue_manager.rb', line 225

def disconnect(connection)
  @@log.debug("#{connection.session_id} QM DISCONNECT.")
  frame = @pending[connection]
  @@log.debug("#{connection.session_id} QM DISCONNECT pending frame: #{frame.inspect}")
  if frame
    @qstore.requeue(frame.headers['destination'],frame)
    @pending.delete connection
  end
  #
  @queues.each do |dest, queue|
    queue.delete_if { |qu| qu.connection == connection }
    @queues.delete(dest) if queue.empty?
  end
end

#enqueue(frame) ⇒ Object

enqueue: add a message to a queue.



295
296
297
298
299
# File 'lib/stomp_server_ng/queue_manager.rb', line 295

def enqueue(frame)
  frame.command = "MESSAGE"
  dest = frame.headers['destination']
  @qstore.enqueue(dest,frame)
end

#send_a_backlog(connection) ⇒ Object

send_a_backlog

Send at most one frame to a connection. Used when use_ack == true. Called from the ack method.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/stomp_server_ng/queue_manager.rb', line 81

def send_a_backlog(connection)
  @@log.debug "#{connection.session_id} QM send_a_backlog starts"
  #
  # lookup queues with data for this connection
  #

  # :stopdoc:

  # 1.9 compatability
  #
  # The Hash#select method returns:
  #
  # * An Array (of Arrays) in Ruby 1.8
  # * A Hash in Ruby 1.9
  #
  # Watch the code in this method.  It is a bit ugly because of that
  # difference.

  # :startdoc:

  possible_queues = @queues.select{ |destination, users|
    @qstore.message_for?(destination, connection.session_id) &&
      users.detect{|u| u.connection == connection}
  }
  if possible_queues.empty?
    @@log.debug "#{connection.session_id} QM  s_a_b nothing to send"
    return
  end
  #
  # Get a random one (avoid artificial priority between queues
  # without coding a whole scheduler, which might be desirable later)
  #
  # Select a random destination from those possible

  # :stopdoc:

  # Told ya' this would get ugly.  A quote from the Pickaxe.  I am:
  #
  # 'abandoning the benefits of polymorphism, and bringing the gods of refactoring down around my ears'
  #
  # :-)

  # :startdoc:

# The following log call results in an exception using 1.9.2p180.  I cannot
# recreate this using IRB.  It has something to do with 'Struct's I think.
#    @@log.debug("#{connection.session_id} possible_queues: #{possible_queues.inspect}")


  case possible_queues
    when Hash
      #  possible_queues _is_ a Hash
      dests_possible = possible_queues.keys     # Get keys of a Hash of destination / queues
      dest_index = rand(dests_possible.size)    # Random index
      dest = dests_possible[dest_index]         # Select a destination / queue
      # The selected destination has (possibly) multiple users.
      # Select a random user from those possible
      user_index = rand(possible_queues[dest].size) # Random index
      user = possible_queues[dest][user_index]  # Array entry from Hash table entry
      #
    when Array
      # possible_queues _is_ an Array
      dest_index = rand(possible_queues.size)    # Random index
      dest_data = possible_queues[dest_index]    # Select a destination + user array
      dest = dest_data[0]                        # Select a destination / queue
      # The selected destination has (possibly) multiple users.
      # Select a random user from those possible
      user_index = rand(dest_data[1].size)     # Random index
      user = dest_data[1][user_index]  # Array entry from Hash table entry
    else
      raise "#{connection.session_id} something is very not right : #{RUBY_VERSION}"
  end

  #
  @@log.debug "#{connection.session_id} QM s_a_b chosen -> dest: #{dest}"
# Ditto for this log statement using 1.9.2p180.
#    @@log.debug "#{connection.session_id} QM s_a_b chosen -> user: #{user}"
  #
  frame = @qstore.dequeue(dest, connection.session_id)
  send_to_user(frame, user)
end

#send_destination_backlog(dest, user) ⇒ Object

send_destination_backlog

Called from the subscribe method.



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/stomp_server_ng/queue_manager.rb', line 167

def send_destination_backlog(dest,user)
  @@log.debug "#{user.connection.session_id} QM send_destination_backlog for #{dest}"
  if user.ack
    # Only send one message, then wait for client ACK.
    frame = @qstore.dequeue(dest, user.connection.session_id)
    if frame
      send_to_user(frame, user)
      @@log.debug("#{user.connection.session_id} QM s_d_b single frame sent")
    end
  else
    # Send all available messages.
    while frame = @qstore.dequeue(dest, user.connection.session_id)
      send_to_user(frame, user)
    end
  end
end

#send_to_user(frame, user) ⇒ Object

send_to_user



242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/stomp_server_ng/queue_manager.rb', line 242

def send_to_user(frame, user)
  @@log.debug("#{user.connection.session_id} QM send_to_user")
  connection = user.connection
  frame.headers['subscription'] = user.subid if user.subid
  if user.ack
    # raise on internal logic error.
    raise "#{user.connection.session_id} other connection's end already busy" if @pending[connection]
    # A maximum of one frame can be pending ACK.
    @pending[connection] = frame
  end
  connection.stomp_send_data(frame)
end

#sendmsg(frame) ⇒ Object

sendmsg

Called from the protocol handler (sendmsg method, process_frame method).



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# File 'lib/stomp_server_ng/queue_manager.rb', line 259

def sendmsg(frame)
  #
  @@log.debug("#{frame.headers['session']} QM client SEND Processing, #{frame}")
  frame.command = "MESSAGE"
  dest = frame.headers['destination']
  # Lookup a user willing to handle this destination
  available_users = @queues[dest].reject{|user| @pending[user.connection]}
  if available_users.empty?
    @qstore.enqueue(dest,frame)
    return
  end
  #
  # Look for a user with ack (we favor reliability)
  #
  reliable_user = available_users.find{|u| u.ack}
  #
  if reliable_user
    # give it a message-id
    @qstore.assign_id(frame, dest)
    send_to_user(frame, reliable_user)
  else
    random_user = available_users[rand(available_users.length)]
    # Note message-id header isn't set but we won't need it anyway
    # <TODO> could break some clients: fix this
    send_to_user(frame, random_user)
  end
end

#stop(session_id) ⇒ Object

Server stop / shutdown.



60
61
62
# File 'lib/stomp_server_ng/queue_manager.rb', line 60

def stop(session_id)
  @qstore.stop(session_id) if (@qstore.methods.include?('stop') || @qstore.methods.include?(:stop))
end

#subscribe(dest, connection, use_ack = false, subid = nil) ⇒ Object

Client subscribe for a destination.

Called from the protocol handler (subscribe method).



68
69
70
71
72
73
# File 'lib/stomp_server_ng/queue_manager.rb', line 68

def subscribe(dest, connection, use_ack=false, subid = nil)
  @@log.debug "#{connection.session_id} QM subscribe to #{dest}, ack => #{use_ack}, connection: #{connection}, subid: #{subid}"
  user = Struct::QueueUser.new(connection, use_ack, subid)
  @queues[dest] += [user]
  send_destination_backlog(dest,user) unless dest == '/queue/monitor'
end

#unsubscribe(dest, connection) ⇒ Object

Client unsubscribe.

Called from the protocol handler (unsubscribe method).



188
189
190
191
192
193
194
# File 'lib/stomp_server_ng/queue_manager.rb', line 188

def unsubscribe(dest, connection)
  @@log.debug "#{connection.session_id} QM unsubscribe from #{dest}, connection #{connection}"
  @queues.each do |d, queue|
    queue.delete_if { |qu| qu.connection == connection and d == dest}
  end
  @queues.delete(dest) if @queues[dest].empty?
end