Class: StompMq::Connection
- Inherits:
-
Object
- Object
- StompMq::Connection
- Defined in:
- lib/stompmq/connection.rb
Overview
Represents a connection to a STOMP message broker.
Instance Method Summary collapse
-
#initialize(opts = { :broker => 'localhost', :port => 61613, :username => nil, :passcode => nil }) ⇒ Connection
constructor
Connect to a STOMP broker and log in.
-
#receive_message(timeout = nil) ⇒ Object
Receive a STOMP message.
-
#send_message(opts = { :queue => nil, :header => {}, :content => {} }) ⇒ Object
Send a STOMP message to a queue.
-
#subscribe(opts = { :queue => nil, :selector => nil }) ⇒ Object
Subscribe to a STOMP queue.
Constructor Details
#initialize(opts = { :broker => 'localhost', :port => 61613, :username => nil, :passcode => nil }) ⇒ Connection
Connect to a STOMP broker and log in.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/stompmq/connection.rb', line 15 def initialize(opts = { :broker => 'localhost', :port => 61613, :username => nil, :passcode => nil }) @buffer = "" @broker = opts[:broker] || 'localhost' @port = opts[:port] || 61613 @username = opts[:username] @passcode = opts[:passcode] @sock = TCPSocket.new @broker, @port send_frame :connect, {:login => @username, :passcode => @passcode} frame = receive_frame(15) # some brokers add a \n after null-terminators, including after CONNECTED frames. @buffer = "" raise ProtocolSequenceViolation unless frame.cmd == 'CONNECTED' self end |
Instance Method Details
#receive_message(timeout = nil) ⇒ Object
Receive a STOMP message.
50 51 52 53 54 55 56 |
# File 'lib/stompmq/connection.rb', line 50 def (timeout = nil) frame = receive_frame(timeout) return nil if !frame raise StompErrorReceived.new(frame.content) if frame.cmd == 'ERROR' raise ProtocolSequenceViolation unless frame.cmd == 'MESSAGE' StompMq::Message.new(frame.header, frame.content) end |
#send_message(opts = { :queue => nil, :header => {}, :content => {} }) ⇒ Object
Send a STOMP message to a queue.
40 41 42 43 44 45 46 47 |
# File 'lib/stompmq/connection.rb', line 40 def (opts = { :queue => nil, :header => {}, :content => {} }) queue = opts[:queue] content = opts[:content] || {} header = opts[:header] || {} header = {:destination => queue}.merge(header) send_frame :send, header, content end |
#subscribe(opts = { :queue => nil, :selector => nil }) ⇒ Object
Subscribe to a STOMP queue.
31 32 33 34 35 36 37 |
# File 'lib/stompmq/connection.rb', line 31 def subscribe(opts = { :queue => nil, :selector => nil }) if opts[:selector] send_frame :subscribe, {:destination => opts[:queue], :selector => opts[:selector]} else send_frame :subscribe, {:destination => opts[:queue]} end end |