Class: StompMq::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/stompmq/connection.rb

Overview

Represents a connection to a STOMP message broker.

Instance Method Summary collapse

Constructor Details

#initialize(opts = { :broker => 'localhost', :port => 61613, :username => nil, :passcode => nil }) ⇒ Connection

Connect to a STOMP broker and log in.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/stompmq/connection.rb', line 15

def initialize(opts = { :broker => 'localhost', :port => 61613, :username => nil, :passcode => nil })
  @buffer = ""
  @broker   = opts[:broker]   || 'localhost'
  @port     = opts[:port]     || 61613
  @username = opts[:username]
  @passcode = opts[:passcode]
  @sock = TCPSocket.new @broker, @port
  send_frame :connect, {:login => @username, :passcode => @passcode}
  frame = receive_frame(15)
  # some brokers add a \n after null-terminators, including after CONNECTED frames.
  @buffer = ""
  raise ProtocolSequenceViolation unless frame.cmd == 'CONNECTED'
  self
end

Instance Method Details

#receive_message(timeout = nil) ⇒ Object

Receive a STOMP message.

Raises:



50
51
52
53
54
55
56
# File 'lib/stompmq/connection.rb', line 50

def receive_message(timeout = nil)
  frame = receive_frame(timeout)
  return nil if !frame
  raise StompErrorReceived.new(frame.content) if frame.cmd == 'ERROR'
  raise ProtocolSequenceViolation unless frame.cmd == 'MESSAGE'
  StompMq::Message.new(frame.header, frame.content)
end

#send_message(opts = { :queue => nil, :header => {}, :content => {} }) ⇒ Object

Send a STOMP message to a queue.



40
41
42
43
44
45
46
47
# File 'lib/stompmq/connection.rb', line 40

def send_message(opts = { :queue => nil, :header => {}, :content => {} })
  queue   = opts[:queue]
  content = opts[:content] || {}
  header  = opts[:header] || {}
  
  header = {:destination => queue}.merge(header)
  send_frame :send, header, content
end

#subscribe(opts = { :queue => nil, :selector => nil }) ⇒ Object

Subscribe to a STOMP queue.



31
32
33
34
35
36
37
# File 'lib/stompmq/connection.rb', line 31

def subscribe(opts = { :queue => nil, :selector => nil })
  if opts[:selector]
    send_frame :subscribe, {:destination => opts[:queue], :selector => opts[:selector]}
  else
    send_frame :subscribe, {:destination => opts[:queue]}
  end
end