Class: StompOut::Client
- Inherits:
-
Object
- Object
- StompOut::Client
- Defined in:
- lib/stomp_out/client.rb
Overview
Abstract base class for STOMP client for use with an existing server connection, such as a WebSocket. Derived classes are responsible for supplying the following functions:
send_data(data) - send data over connection
on_connected(frame, session_id, server_name) - handle notification that now connected
(frame, destination, , content_type, , ack_id) - handle
received from server
on_receipt(frame, receipt_id) - handle notification that a request was successfully
handled by server
on_error(frame, , details, receipt_id) - handle notification from server
that a request failed and that the connection should be closed
Constant Summary collapse
- SUPPORTED_VERSIONS =
["1.0", "1.1", "1.2"]
- ACK_SETTINGS =
{ "1.0" => ["auto", "client"], "1.1" => ["auto", "client", "client-individual"], "1.2" => ["auto", "client", "client-individual"] }
- SERVER_COMMANDS =
[:connected, :message, :receipt, :error]
- MIN_SEND_HEARTBEAT =
5000
Instance Attribute Summary collapse
-
#heartbeat ⇒ Object
readonly
- Heartbeat
-
heartbeat generator and monitor.
-
#host ⇒ Object
readonly
- String
-
host to which client is connecting.
-
#server_name ⇒ Object
readonly
- String
-
name assigned to server.
-
#session_id ⇒ Object
readonly
- String
-
session_id assigned to session.
-
#version ⇒ Object
readonly
- String
-
version of STOMP chosen for session.
Instance Method Summary collapse
-
#abort(id, receipt = nil, headers = nil) ⇒ String, NilClass
Roll back a transaction.
-
#ack(ack_id, receipt = nil, transaction_id = nil, headers = nil) ⇒ String, NilClass
Acknowledge consumption of a message from a subscription.
-
#begin(receipt = nil, headers = nil) ⇒ Array<String>
Start a transaction.
-
#commit(id, receipt = nil, headers = nil) ⇒ String, NilClass
Commit a transaction.
-
#connect(heartbeat = nil, login = nil, passcode = nil, headers = nil) ⇒ TrueClass
Connect to server.
-
#connected? ⇒ Boolean
Determine whether connected to STOMP server.
-
#disconnect(receipt = nil, headers = nil) ⇒ String, NilClass
Disconnect from the server Client is expected to close its connection after calling this function If receipt is requested, it may not be received before frame is reset.
-
#initialize(options = {}) ⇒ Client
constructor
Create STOMP client.
-
#message(destination, message, content_type = nil, receipt = nil, transaction_id = nil, headers = nil) ⇒ String, NilClass
Send message to given destination.
-
#nack(ack_id, receipt = nil, transaction_id = nil, headers = nil) ⇒ String, NilClass
Tell the server that a message was not consumed.
-
#on_connected(frame, session_id, server_name) ⇒ TrueClass
Handle notification that now connected to server.
-
#on_error(frame, error, details, receipt_id) ⇒ TrueClass
Handle notification from server that a request failed and that the connection should be closed.
-
#on_message(frame, destination, message, content_type, message_id, ack_id) ⇒ TrueClass
Handle message received from server.
-
#on_receipt(frame, receipt_id) ⇒ TrueClass
Handle notification that a request was successfully handled by server.
-
#receive_data(data) ⇒ TrueClass
Process data received over connection from server.
-
#report_error(error) ⇒ TrueClass
Report to client that an error was encountered locally Not intended for use by end user of this class.
-
#send_data(data) ⇒ TrueClass
Send data over connection to server.
-
#subscribe(destination, ack = nil, receipt = nil, headers = nil) ⇒ String, NilClass
Register to listen to a given destination.
-
#subscriptions ⇒ Array<String>
List active subscriptions.
-
#transactions ⇒ Array<String>
List active transactions.
-
#unsubscribe(destination, receipt = nil, headers = nil) ⇒ String, NilClass
Remove an existing subscription.
Constructor Details
#initialize(options = {}) ⇒ Client
Create STOMP client
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/stomp_out/client.rb', line 74 def initialize( = {}) = @host = [:host] || "stomp" @parser = StompOut::Parser.new @ack_id = 0 = {} # ack ID is key @subscribe_id = 0 @subscribes = {} # destination is key @transaction_id = 0 @transaction_ids = [] @receipt = [:receipt] @receipt_id = 0 @receipted_frames = {} # receipt-id is key @connected = false end |
Instance Attribute Details
#heartbeat ⇒ Object (readonly)
- Heartbeat
-
heartbeat generator and monitor
62 63 64 |
# File 'lib/stomp_out/client.rb', line 62 def heartbeat @heartbeat end |
#host ⇒ Object (readonly)
- String
-
host to which client is connecting
59 60 61 |
# File 'lib/stomp_out/client.rb', line 59 def host @host end |
#server_name ⇒ Object (readonly)
- String
-
name assigned to server
56 57 58 |
# File 'lib/stomp_out/client.rb', line 56 def server_name @server_name end |
#session_id ⇒ Object (readonly)
- String
-
session_id assigned to session
53 54 55 |
# File 'lib/stomp_out/client.rb', line 53 def session_id @session_id end |
#version ⇒ Object (readonly)
- String
-
version of STOMP chosen for session
50 51 52 |
# File 'lib/stomp_out/client.rb', line 50 def version @version end |
Instance Method Details
#abort(id, receipt = nil, headers = nil) ⇒ String, NilClass
Roll back a transaction
410 411 412 413 414 415 416 417 |
# File 'lib/stomp_out/client.rb', line 410 def abort(id, receipt = nil, headers = nil) raise ProtocolError.new("Not connected") unless @connected raise ApplicationError.new("Transaction #{id} not found") unless @transaction_ids.delete(id.to_s) headers ||= {} headers["transaction"] = id.to_s frame = send_frame("ABORT", headers, body = nil, content_type = nil, receipt) frame.headers["receipt"] end |
#ack(ack_id, receipt = nil, transaction_id = nil, headers = nil) ⇒ String, NilClass
Acknowledge consumption of a message from a subscription
327 328 329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/stomp_out/client.rb', line 327 def ack(ack_id, receipt = nil, transaction_id = nil, headers = nil) raise ProtocolError.new("Not connected") unless @connected = .delete(ack_id) headers ||= {} if @version == "1.0" raise ApplicationError.new("No message was received with ack #{ack_id}") if .nil? headers["message-id"] = frame = send_frame("ACK", headers, body = nil, content_type = nil, receipt, transaction_id) else headers["id"] = ack_id.to_s frame = send_frame("ACK", headers, body = nil, content_type = nil, receipt, transaction_id) end frame.headers["receipt"] end |
#begin(receipt = nil, headers = nil) ⇒ Array<String>
Start a transaction
371 372 373 374 375 376 377 378 379 |
# File 'lib/stomp_out/client.rb', line 371 def begin(receipt = nil, headers = nil) raise ProtocolError.new("Not connected") unless @connected id = (@transaction_id += 1).to_s headers ||= {} headers["transaction"] = id.to_s frame = send_frame("BEGIN", headers, body = nil, content_type = nil, receipt) @transaction_ids << id [id, frame.headers["receipt"]] end |
#commit(id, receipt = nil, headers = nil) ⇒ String, NilClass
Commit a transaction
391 392 393 394 395 396 397 398 |
# File 'lib/stomp_out/client.rb', line 391 def commit(id, receipt = nil, headers = nil) raise ProtocolError.new("Not connected") unless @connected raise ApplicationError.new("Transaction #{id} not found") unless @transaction_ids.delete(id.to_s) headers ||= {} headers["transaction"] = id.to_s frame = send_frame("COMMIT", headers, body = nil, content_type = nil, receipt) frame.headers["receipt"] end |
#connect(heartbeat = nil, login = nil, passcode = nil, headers = nil) ⇒ TrueClass
Connect to server
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/stomp_out/client.rb', line 229 def connect(heartbeat = nil, login = nil, passcode = nil, headers = nil) raise ProtocolError, "Already connected" if @connected headers ||= {} headers["accept-version"] = SUPPORTED_VERSIONS.join(",") headers["host"] = @host if heartbeat raise ApplicationError.new("Heartbeat not usable without eventmachine") unless Heartbeat.usable? headers["heart-beat"] = "#{@options[:min_send_interval] || MIN_SEND_HEARTBEAT},#{heartbeat}" end if login headers["login"] = login headers["passcode"] = passcode end send_frame("CONNECT", headers) true end |
#connected? ⇒ Boolean
Determine whether connected to STOMP server
107 108 109 |
# File 'lib/stomp_out/client.rb', line 107 def connected? !!@connected end |
#disconnect(receipt = nil, headers = nil) ⇒ String, NilClass
Disconnect from the server Client is expected to close its connection after calling this function If receipt is requested, it may not be received before frame is reset
429 430 431 432 433 434 435 |
# File 'lib/stomp_out/client.rb', line 429 def disconnect(receipt = nil, headers = nil) raise ProtocolError.new("Not connected") unless @connected frame = send_frame("DISCONNECT", headers, body = nil, content_type = nil, receipt) @heartbeat.stop if @heartbeat @connected = false frame.headers["receipt"] end |
#message(destination, message, content_type = nil, receipt = nil, transaction_id = nil, headers = nil) ⇒ String, NilClass
Send message to given destination
261 262 263 264 265 266 267 |
# File 'lib/stomp_out/client.rb', line 261 def (destination, , content_type = nil, receipt = nil, transaction_id = nil, headers = nil) raise ProtocolError.new("Not connected") unless @connected headers ||= {} headers["destination"] = destination frame = send_frame("SEND", headers, , content_type, receipt, transaction_id) frame.headers["receipt"] end |
#nack(ack_id, receipt = nil, transaction_id = nil, headers = nil) ⇒ String, NilClass
Tell the server that a message was not consumed
353 354 355 356 357 358 359 360 361 |
# File 'lib/stomp_out/client.rb', line 353 def nack(ack_id, receipt = nil, transaction_id = nil, headers = nil) raise ProtocolError.new("Command 'nack' not supported") if @version == "1.0" raise ProtocolError.new("Not connected") unless @connected .delete(ack_id) headers ||= {} headers["id"] = ack_id.to_s frame = send_frame("NACK", headers, body = nil, content_type = nil, receipt, transaction_id) frame.headers["receipt"] end |
#on_connected(frame, session_id, server_name) ⇒ TrueClass
Handle notification that now connected to server
167 168 169 |
# File 'lib/stomp_out/client.rb', line 167 def on_connected(frame, session_id, server_name) raise "Not implemented" end |
#on_error(frame, error, details, receipt_id) ⇒ TrueClass
Handle notification from server that a request failed and that the connection should be closed
208 209 210 |
# File 'lib/stomp_out/client.rb', line 208 def on_error(frame, error, details, receipt_id) raise "Not implemented" end |
#on_message(frame, destination, message, content_type, message_id, ack_id) ⇒ TrueClass
Handle message received from server
183 184 185 |
# File 'lib/stomp_out/client.rb', line 183 def (frame, destination, , content_type, , ack_id) raise "Not implemented" end |
#on_receipt(frame, receipt_id) ⇒ TrueClass
Handle notification that a request was successfully handled by server
194 195 196 |
# File 'lib/stomp_out/client.rb', line 194 def on_receipt(frame, receipt_id) raise "Not implemented" end |
#receive_data(data) ⇒ TrueClass
Process data received over connection from server
137 138 139 140 141 142 143 144 |
# File 'lib/stomp_out/client.rb', line 137 def receive_data(data) @parser << data process_frames @heartbeat.received_data if @heartbeat true rescue StandardError => e report_error(e) end |
#report_error(error) ⇒ TrueClass
Report to client that an error was encountered locally Not intended for use by end user of this class
117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/stomp_out/client.rb', line 117 def report_error(error) details = "" if error.is_a?(ProtocolError) || error.is_a?(ApplicationError) = error. elsif error.is_a?(Exception) = "#{error.class}: #{error.message}" details = error.backtrace.join("\n") if error.respond_to?(:backtrace) else = error.to_s end frame = Frame.new("ERROR", {"message" => }, details) on_error(frame, , details, receipt_id = nil) true end |
#send_data(data) ⇒ TrueClass
Send data over connection to server
155 156 157 |
# File 'lib/stomp_out/client.rb', line 155 def send_data(data) raise "Not implemented" end |
#subscribe(destination, ack = nil, receipt = nil, headers = nil) ⇒ String, NilClass
Register to listen to a given destination
281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/stomp_out/client.rb', line 281 def subscribe(destination, ack = nil, receipt = nil, headers = nil) raise ProtocolError.new("Not connected") unless @connected raise ApplicationError.new("Already subscribed to '#{destination}'") if @subscribes[destination] raise ProtocolError.new("Invalid 'ack' setting") if ack && !ACK_SETTINGS[@version].include?(ack) @subscribes[destination] = {:id => (@subscribe_id += 1).to_s, :ack => ack} headers ||= {} headers["destination"] = destination headers["id"] = @subscribe_id.to_s headers["ack"] = ack if ack frame = send_frame("SUBSCRIBE", headers, body = nil, content_type = nil, receipt) frame.headers["receipt"] end |
#subscriptions ⇒ Array<String>
List active subscriptions
93 94 95 |
# File 'lib/stomp_out/client.rb', line 93 def subscriptions @subscribes.keys end |
#transactions ⇒ Array<String>
List active transactions
100 101 102 |
# File 'lib/stomp_out/client.rb', line 100 def transactions @transaction_ids end |
#unsubscribe(destination, receipt = nil, headers = nil) ⇒ String, NilClass
Remove an existing subscription
304 305 306 307 308 309 310 311 312 313 |
# File 'lib/stomp_out/client.rb', line 304 def unsubscribe(destination, receipt = nil, headers = nil) raise ProtocolError.new("Not connected") unless @connected subscribe = @subscribes.delete(destination) raise ApplicationError.new("Subscription to '#{destination}' not found") if subscribe.nil? headers ||= {} headers["id"] = subscribe[:id] headers["destination"] = destination if @version == "1.0" frame = send_frame("UNSUBSCRIBE", headers, body = nil, content_type = nil, receipt) frame.headers["receipt"] end |