Class: Stomp::Connection
- Inherits:
-
Object
- Object
- Stomp::Connection
- Defined in:
- lib/stomp/connection.rb
Overview
Low level connection which maps commands and supports synchronous receives
Class Method Summary collapse
-
.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) ⇒ Object
Syntactic sugar for ‘Connection.new’ See ‘initialize’ for usage.
Instance Method Summary collapse
-
#__old_receive ⇒ Object
Receive a frame, block until the frame is received.
-
#abort(name, headers = {}) ⇒ Object
Abort a transaction by name.
-
#ack(message_id, headers = {}) ⇒ Object
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe “/queue/a”, :ack => ‘client’g.
-
#begin(name, headers = {}) ⇒ Object
Begin a transaction, requires a name for the transaction.
-
#closed? ⇒ Boolean
Is this connection closed?.
-
#commit(name, headers = {}) ⇒ Object
Commit a transaction by name.
-
#disconnect(headers = {}) ⇒ Object
Close this connection.
-
#initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) ⇒ Connection
constructor
Stomp URL : A Stomp URL must begin with ‘stomp://’ and can be in one of the following forms:.
-
#open? ⇒ Boolean
Is this connection open?.
-
#poll ⇒ Object
Return a pending message if one is available, otherwise return nil.
- #receive ⇒ Object
-
#send(destination, message, headers = {}) ⇒ Object
Send message to destination.
- #socket ⇒ Object
-
#subscribe(name, headers = {}, subId = nil) ⇒ Object
Subscribe to a destination, must specify a name.
-
#unsubscribe(name, headers = {}, subId = nil) ⇒ Object
Unsubscribe from a destination, must specify a name.
Constructor Details
#initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) ⇒ Connection
Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms:
stomp://host:port
stomp://host.domain.tld:port
stomp://user:pass@host:port
stomp://user:[email protected]:port
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/stomp/connection.rb', line 28 def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) @host = host @port = port @login = login @passcode = passcode @transmit_semaphore = Mutex.new @read_semaphore = Mutex.new @socket_semaphore = Mutex.new @reliable = reliable @reconnect_delay = reconnect_delay @connect_headers = connect_headers @closed = false @subscriptions = {} @failure = nil socket end |
Class Method Details
.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) ⇒ Object
Syntactic sugar for ‘Connection.new’ See ‘initialize’ for usage.
46 47 48 |
# File 'lib/stomp/connection.rb', line 46 def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers) end |
Instance Method Details
#__old_receive ⇒ Object
Receive a frame, block until the frame is received
161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/stomp/connection.rb', line 161 def __old_receive # The recive my fail so we may need to retry. while TRUE begin s = socket return _receive(s) rescue @failure = $!; raise unless @reliable $stderr.print "receive failed: " + $!; end end end |
#abort(name, headers = {}) ⇒ Object
Abort a transaction by name
110 111 112 113 |
# File 'lib/stomp/connection.rb', line 110 def abort(name, headers = {}) headers[:transaction] = name transmit("ABORT", headers) end |
#ack(message_id, headers = {}) ⇒ Object
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe “/queue/a”, :ack => ‘client’g
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
98 99 100 101 |
# File 'lib/stomp/connection.rb', line 98 def ack(, headers = {}) headers['message-id'] = transmit("ACK", headers) end |
#begin(name, headers = {}) ⇒ Object
Begin a transaction, requires a name for the transaction
89 90 91 92 |
# File 'lib/stomp/connection.rb', line 89 def begin(name, headers = {}) headers[:transaction] = name transmit("BEGIN", headers) end |
#closed? ⇒ Boolean
Is this connection closed?
84 85 86 |
# File 'lib/stomp/connection.rb', line 84 def closed? @closed end |
#commit(name, headers = {}) ⇒ Object
Commit a transaction by name
104 105 106 107 |
# File 'lib/stomp/connection.rb', line 104 def commit(name, headers = {}) headers[:transaction] = name transmit("COMMIT", headers) end |
#disconnect(headers = {}) ⇒ Object
Close this connection
146 147 148 149 |
# File 'lib/stomp/connection.rb', line 146 def disconnect(headers = {}) transmit("DISCONNECT", headers) @closed = true end |
#open? ⇒ Boolean
Is this connection open?
79 80 81 |
# File 'lib/stomp/connection.rb', line 79 def open? !@closed end |
#poll ⇒ Object
Return a pending message if one is available, otherwise return nil
153 154 155 156 157 158 |
# File 'lib/stomp/connection.rb', line 153 def poll @read_semaphore.synchronize do return nil if @socket.nil? || !@socket.ready? return receive end end |
#receive ⇒ Object
175 176 177 178 179 180 181 182 183 |
# File 'lib/stomp/connection.rb', line 175 def receive super_result = __old_receive() if super_result.nil? && @reliable $stderr.print "connection.receive returning EOF as nil - resetting connection.\n" @socket = nil super_result = __old_receive() end return super_result end |
#send(destination, message, headers = {}) ⇒ Object
Send message to destination
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
140 141 142 143 |
# File 'lib/stomp/connection.rb', line 140 def send(destination, , headers = {}) headers[:destination] = destination transmit("SEND", headers, ) end |
#socket ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/stomp/connection.rb', line 50 def socket # Need to look into why the following synchronize does not work. #@read_semaphore.synchronize do s = @socket; while s.nil? || !@failure.nil? @failure = nil begin s = TCPSocket.open @host, @port headers = @connect_headers.clone headers[:login] = @login headers[:passcode] = @passcode _transmit(s, "CONNECT", headers) @connect = _receive(s) # replay any subscriptions. @subscriptions.each { |k,v| _transmit(s, "SUBSCRIBE", v) } rescue @failure = $!; s=nil; raise unless @reliable $stderr.print "connect failed: " + $! +" will retry in #{@reconnect_delay}\n"; sleep(@reconnect_delay); end end @socket = s return s; #end end |
#subscribe(name, headers = {}, subId = nil) ⇒ Object
Subscribe to a destination, must specify a name
116 117 118 119 120 121 122 123 124 125 |
# File 'lib/stomp/connection.rb', line 116 def subscribe(name, headers = {}, subId = nil) headers[:destination] = name transmit("SUBSCRIBE", headers) # Store the sub so that we can replay if we reconnect. if @reliable subId = name if subId.nil? @subscriptions[subId] = headers end end |
#unsubscribe(name, headers = {}, subId = nil) ⇒ Object
Unsubscribe from a destination, must specify a name
128 129 130 131 132 133 134 135 |
# File 'lib/stomp/connection.rb', line 128 def unsubscribe(name, headers = {}, subId = nil) headers[:destination] = name transmit("UNSUBSCRIBE", headers) if @reliable subId = name if subId.nil? @subscriptions.delete(subId) end end |