Class: Typhon::StompClient
- Inherits:
-
EM::Connection
- Object
- EM::Connection
- Typhon::StompClient
- Includes:
- EM::Protocols::Stomp
- Defined in:
- lib/typhon/stompclient.rb
Instance Method Summary collapse
- #connected? ⇒ Boolean
- #connection_completed ⇒ Object
-
#initialize(params = {}) ⇒ StompClient
constructor
A new instance of StompClient.
- #publish(topic, message, param = {}) ⇒ Object
- #unbind ⇒ Object
Constructor Details
#initialize(params = {}) ⇒ StompClient
Returns a new instance of StompClient.
5 6 7 8 9 |
# File 'lib/typhon/stompclient.rb', line 5 def initialize(params={}) @connected = false @options = {:auto_reconnect => true, :timeout => 2, :max_queue_size => 500} @queue = EM::Queue.new end |
Instance Method Details
#connected? ⇒ Boolean
28 29 30 |
# File 'lib/typhon/stompclient.rb', line 28 def connected? (@connected && !error?) end |
#connection_completed ⇒ Object
11 12 13 14 15 16 |
# File 'lib/typhon/stompclient.rb', line 11 def connection_completed connect :login => Config[:stomp][:user], :passcode => Config[:stomp][:pass] Log.debug("Authenticated to %s:%d" % [ Config[:stomp][:server], Config[:stomp][:port] ]) @connected = true end |
#publish(topic, message, param = {}) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/typhon/stompclient.rb', line 32 def publish(topic, , param={}) if connected? send(topic, , param) until @queue.empty? do @queue.pop do |msg| send(msg[:topic], msg[:message], msg[:param]) end end else if @queue.size < @options[:max_queue_size] @queue.push({:topic => topic, :message => , :param => param}) end end end |
#unbind ⇒ Object
18 19 20 21 22 23 24 25 26 |
# File 'lib/typhon/stompclient.rb', line 18 def unbind Log.error("Connection to %s:%d failed" % [ Config[:stomp][:server], Config[:stomp][:port] ]) @connected = false EM.add_timer(@options[:timeout]) do Log.debug("Connecting to Stomp Server %s:%d" % [ Config[:stomp][:server], Config[:stomp][:port] ]) reconnect Config[:stomp][:server], Config[:stomp][:port] end end |