Module: EventMachine::Protocols::Stomp

Includes:
LineText2
Defined in:
lib/em/protocols/stomp.rb

Overview

Implements Stomp (docs.codehaus.org/display/STOMP/Protocol).

Usage example

module StompClient
  include EM::Protocols::Stomp

  def connection_completed
    connect :login => 'guest', :passcode => 'guest'
  end

  def receive_msg msg
    if msg.command == "CONNECTED"
      subscribe '/some/topic'
    else
      p ['got a message', msg]
      puts msg.body
    end
  end
end

EM.run{
  EM.connect 'localhost', 61613, StompClient
}

Defined Under Namespace

Classes: Message

Constant Summary

Constants included from LineText2

LineText2::MaxBinaryLength, LineText2::MaxLineLength

Instance Method Summary collapse

Methods included from LineText2

#receive_data, #receive_end_of_binary_data, #set_binary_mode, #set_delimiter, #set_line_mode, #set_text_mode, #unbind

Instance Method Details

#ack(msgid) ⇒ Object

ACK command, for acknowledging receipt of messages

module StompClient
  include EM::P::Stomp

  def connection_completed
    connect :login => 'guest', :passcode => 'guest'
    # subscribe with ack mode
    subscribe '/some/topic', true
  end

  def receive_msg msg
    if msg.command == "MESSAGE"
      ack msg.headers['message-id']
      puts msg.body
    end
  end
end


193
194
195
# File 'lib/em/protocols/stomp.rb', line 193

def ack msgid
  send_frame "ACK", 'message-id'=> msgid
end

#connect(parms = {}) ⇒ Object

CONNECT command, for authentication

connect :login => 'guest', :passcode => 'guest'


154
155
156
# File 'lib/em/protocols/stomp.rb', line 154

def connect parms={}
  send_frame "CONNECT", parms
end

#init_message_readerObject



136
137
138
139
140
141
# File 'lib/em/protocols/stomp.rb', line 136

def init_message_reader
  @stomp_initialized = true
  set_delimiter "\n"
  set_line_mode
  @stomp_message = Message.new
end

#receive_binary_data(data) ⇒ Object



130
131
132
133
134
# File 'lib/em/protocols/stomp.rb', line 130

def receive_binary_data data
  @stomp_message.body = data[0..-2]
  receive_msg(@stomp_message) if respond_to?(:receive_msg)
  init_message_reader
end

#receive_line(line) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/em/protocols/stomp.rb', line 116

def receive_line line
  @stomp_initialized || init_message_reader
  @stomp_message.consume_line(line) {|outcome|
    if outcome.first == :sized_text
      set_text_mode outcome[1]
    elsif outcome.first == :unsized_text
      set_delimiter "\0"
    elsif outcome.first == :dispatch
      receive_msg(@stomp_message) if respond_to?(:receive_msg)
      init_message_reader
    end
  }
end

#receive_msg(msg) ⇒ Object

Invoked with an incoming Stomp::Message received from the STOMP server



146
147
148
# File 'lib/em/protocols/stomp.rb', line 146

def receive_msg msg
  # stub, overwrite this in your handler
end

#send(destination, body, parms = {}) ⇒ Object

SEND command, for publishing messages to a topic

send '/topic/name', 'some message here'


162
163
164
# File 'lib/em/protocols/stomp.rb', line 162

def send destination, body, parms={}
  send_frame "SEND", parms.merge( :destination=>destination ), body.to_s
end

#send_frame(verb, headers = {}, body = "") ⇒ Object

:stopdoc:



105
106
107
108
109
110
111
112
113
114
# File 'lib/em/protocols/stomp.rb', line 105

def send_frame verb, headers={}, body=""
  ary = [verb, "\n"]
  headers.each {|k,v| ary << "#{k}:#{v}\n" }
  ary << "content-length: #{body.to_s.length}\n"
  ary << "content-type: text/plain; charset=UTF-8\n"
  ary << "\n"
  ary << body.to_s
  ary << "\0"
  send_data ary.join
end

#subscribe(dest, ack = false) ⇒ Object

SUBSCRIBE command, for subscribing to topics

subscribe '/topic/name', false


170
171
172
# File 'lib/em/protocols/stomp.rb', line 170

def subscribe dest, ack=false
  send_frame "SUBSCRIBE", {:destination=>dest, :ack=>(ack ? "client" : "auto")}
end