Class: StompServer::Protocols::Stomp
- Inherits:
-
EventMachine::Connection
- Object
- EventMachine::Connection
- StompServer::Protocols::Stomp
- Defined in:
- lib/stomp_server_ng/protocols/stomp.rb
Overview
Stomp Protocol Handler.
Instance Attribute Summary collapse
-
#session_id ⇒ Object
readonly
Returns the value of attribute session_id.
Instance Method Summary collapse
-
#abort(frame, trans = nil) ⇒ Object
Stomp Protocol - ABORT.
-
#ack(frame) ⇒ Object
Stomp Protocol - ACK.
-
#begin(frame, trans = nil) ⇒ Object
Stomp Protocol - BEGIN.
-
#commit(frame, trans = nil) ⇒ Object
Stomp Protocol - COMMIT.
-
#connect(frame) ⇒ Object
Stomp Protocol - CONNECT.
-
#disconnect(frame) ⇒ Object
Stomp Protocol - DISCONNECT.
-
#handle_transaction(frame, trans, cmd) ⇒ Object
handle_transaction.
-
#initialize(*args) ⇒ Stomp
constructor
Protocol handler initialization.
-
#post_init ⇒ Object
EM::Connection.post_init()
. -
#process_frame(frame) ⇒ Object
process_frame.
-
#process_frames ⇒ Object
process_frames.
-
#receive_data(data) ⇒ Object
EM::Connection.receive_data(data)
. -
#send(frame) ⇒ Object
Stomp Protocol - SEND.
-
#send_data(data) ⇒ Object
EM::Connection.send_data(data)
. -
#send_error(msg) ⇒ Object
send_error.
-
#send_frame(command, headers = {}, body = '') ⇒ Object
send_frame.
-
#send_receipt(id) ⇒ Object
send_receipt.
-
#stomp_receive_data(data) ⇒ Object
:startdoc:.
-
#stomp_send_data(frame) ⇒ Object
stomp_send_data.
-
#subscribe(frame) ⇒ Object
Stomp Protocol - SUBSCRIBE.
-
#unbind ⇒ Object
EM::Connection.unbind()
. -
#unsubscribe(frame) ⇒ Object
Stomp Protocol - UNSUBSCRIBE.
Constructor Details
#initialize(*args) ⇒ Stomp
Protocol handler initialization
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 24 def initialize(*args) super(*args) # @@log = Logger.new(STDOUT) @@log.level = StompServer::LogHelper.get_loglevel() # @@options = (Hash === args.last) ? args.pop : {} # Arguments are passed from EventMachine::start_server @@auth_required = args[0] @@queue_manager = args[1] @@topic_manager = args[2] @@stompauth = args[3] # # N.B.: The session ID is an instance variable! # if @@options[:session_cache] == 0 lt = Time.now @session_id = "ssng_#{lt.to_f}" else @session_id = StompServer::SessionIDManager.get_cache_id(@@options[:session_cache]) end @@log.debug("#{@session_id} #{self} Session ID assigned") # @@log.warn("#{@session_id} #{self} Protocol initialization complete") end |
Instance Attribute Details
#session_id ⇒ Object (readonly)
Returns the value of attribute session_id.
21 22 23 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 21 def session_id @session_id end |
Instance Method Details
#abort(frame, trans = nil) ⇒ Object
Stomp Protocol - ABORT
184 185 186 187 188 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 184 def abort(frame, trans=nil) raise "#{@session_id} Missing transaction" unless trans raise "#{@session_id} transaction does not exist" unless @transactions.has_key?(trans) @transactions.delete(trans) end |
#ack(frame) ⇒ Object
Stomp Protocol - ACK
Delegated to the queue manager.
194 195 196 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 194 def ack(frame) @@queue_manager.ack(self, frame) end |
#begin(frame, trans = nil) ⇒ Object
Stomp Protocol - BEGIN
200 201 202 203 204 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 200 def begin(frame, trans=nil) raise "#{@session_id} Missing transaction" unless trans raise "#{@session_id} transaction exists" if @transactions.has_key?(trans) @transactions[trans] = [] end |
#commit(frame, trans = nil) ⇒ Object
Stomp Protocol - COMMIT
208 209 210 211 212 213 214 215 216 217 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 208 def commit(frame, trans=nil) raise "#{@session_id} Missing transaction" unless trans raise "#{@session_id} transaction does not exist" unless @transactions.has_key?(trans) # (@transactions[trans]).each do |frame| frame.headers.delete('transaction') process_frame(frame) end @transactions.delete(trans) end |
#connect(frame) ⇒ Object
Stomp Protocol - CONNECT
221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 221 def connect(frame) if @@auth_required unless frame.headers['login'] and frame.headers['passcode'] and @@stompauth.[frame.headers['login']] == frame.headers['passcode'] raise "#{@session_id} {self} Invalid Login" end end @@log.warn "#{@session_id} Connecting" response = StompServer::StompFrame.new("CONNECTED", {'session' => @session_id}) # stomp_send_data(response) @connected = true end |
#disconnect(frame) ⇒ Object
Stomp Protocol - DISCONNECT
236 237 238 239 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 236 def disconnect(frame) @@log.warn "#{@session_id} Polite disconnect" close_connection_after_writing end |
#handle_transaction(frame, trans, cmd) ⇒ Object
handle_transaction
360 361 362 363 364 365 366 367 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 360 def handle_transaction(frame, trans, cmd) if [:begin, :commit, :abort].include?(cmd) __send__(cmd, frame, trans) # Object#send alias call else raise "#{@session_id} transaction does not exist" unless @transactions.has_key?(trans) @transactions[trans] << frame end end |
#post_init ⇒ Object
EM::Connection.post_init()
Protocol handler post initialization.
100 101 102 103 104 105 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 100 def post_init @sfr = StompServer::StompFrameRecognizer.new @transactions = {} @connected = false @@log.debug("#{@session_id} protocol post_init complete") end |
#process_frame(frame) ⇒ Object
process_frame
Process and individual stomp frame.
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 339 def process_frame(frame) cmd = frame.command.downcase.to_sym raise "#{@session_id} #{self} Unhandled frame: #{cmd}" unless VALID_COMMANDS.include?(cmd) raise "#{@session_id} #{self} Not connected" if !@connected && cmd != :connect @@log.debug("#{@session_id} process_frame: #{frame.command}") # Add session ID to the frame headers frame.headers['session'] = @session_id # Send receipt first if required send_receipt(frame.headers['receipt']) if frame.headers['receipt'] # if trans = frame.headers['transaction'] # Handle transactional frame if required. handle_transaction(frame, trans, cmd) else # Otherwise, just route the non-transactional frame. __send__(cmd, frame) # Object#send alias call end end |
#process_frames ⇒ Object
process_frames
Handle all stomp frames currently in the recognizer’s accumulated array of frames.
330 331 332 333 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 330 def process_frames frame = nil process_frame(frame) while frame = @sfr.frames.shift end |
#receive_data(data) ⇒ Object
EM::Connection.receive_data(data)
Delegate to stomp_receive_data helper.
121 122 123 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 121 def receive_data(data) stomp_receive_data(data) end |
#send(frame) ⇒ Object
Stomp Protocol - SEND
The stomp SEND verb is by routing through:
-
receive_data(data)
-
stomp_receive_data
-
process_frames
-
process_frame
-
use Object#__send__ to call this method
251 252 253 254 255 256 257 258 259 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 251 def send(frame) # set message id if frame.dest.match(%r|^/queue|) @@queue_manager.sendmsg(frame) else frame.headers['message-id'] = "msg-#stompcma-#{@@topic_manager.next_index}" @@topic_manager.sendmsg(frame) end end |
#send_data(data) ⇒ Object
EM::Connection.send_data(data)
Just calls super.
138 139 140 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 138 def send_data(data) super(data) end |
#send_error(msg) ⇒ Object
send_error
Send a single error frame.
373 374 375 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 373 def send_error(msg) send_frame("ERROR",{'message' => 'See below'},msg) end |
#send_frame(command, headers = {}, body = '') ⇒ Object
send_frame
Send an individual stomp frame.
381 382 383 384 385 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 381 def send_frame(command, headers={}, body='') headers['content-length'] = body.size.to_s response = StompServer::StompFrame.new(command, headers, body) stomp_send_data(response) end |
#send_receipt(id) ⇒ Object
send_receipt
Send a single receipt frame.
391 392 393 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 391 def send_receipt(id) send_frame("RECEIPT", { 'receipt-id' => id}) end |
#stomp_receive_data(data) ⇒ Object
:startdoc:
stomp_receive_data
Called from EM::Connection.receive_data(data)
. This is where we begin processing a set of data fresh off the wire.
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 308 def stomp_receive_data(data) begin # Limit log message length. logdata = data logdata = data[0..256] + "...truncated..." if data.length > 256 @@log.debug "#{@session_id} receive_data: #{logdata.inspect}" # Append all data to the recognizer buffer. @sfr << data # Process any stomp frames in this set of data. process_frames rescue Exception => e @@log.error "#{@session_id} err: #{e} #{e.backtrace.join("\n")}" send_error(e.to_s) close_connection_after_writing end end |
#stomp_send_data(frame) ⇒ Object
stomp_send_data
397 398 399 400 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 397 def stomp_send_data(frame) @@log.debug "#{@session_id} Sending frame #{frame.to_s}" send_data(frame.to_s) end |
#subscribe(frame) ⇒ Object
Stomp Protocol - SUBSCRIBE
Delegated to the queue or topic manager.
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 266 def subscribe(frame) use_ack = false use_ack = true if frame.headers['ack'] == 'client' # if frame.headers['id'] subid = frame.headers['id'] elsif frame.headers[:id] subid = frame.headers[:id] else subid = nil end # if frame.dest =~ %r|^/queue| @@queue_manager.subscribe(frame.dest, self, use_ack, subid) else @@topic_manager.subscribe(frame.dest, self) end end |
#unbind ⇒ Object
EM::Connection.unbind()
Unbind the connection.
169 170 171 172 173 174 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 169 def unbind() @@log.warn "#{@session_id} Unbind called" @connected = false @@queue_manager.disconnect(self) @@topic_manager.disconnect(self) end |
#unsubscribe(frame) ⇒ Object
Stomp Protocol - UNSUBSCRIBE
Delegated to the queue or topic manager.
289 290 291 292 293 294 295 |
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 289 def unsubscribe(frame) if frame.dest =~ %r|^/queue| @@queue_manager.unsubscribe(frame.dest,self) else @@topic_manager.unsubscribe(frame.dest,self) end end |