Class: StompOut::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/stomp_out/client.rb

Overview

Abstract base class for STOMP client for use with an existing server connection, such as a WebSocket. Derived classes are responsible for supplying the following functions:

send_data(data) - send data over connection
on_connected(frame, session_id, server_name) - handle notification that now connected
on_message(frame, destination, message, content_type, message_id, ack_id) - handle message
  received from server
on_receipt(frame, receipt_id) - handle notification that a request was successfully
  handled by server
on_error(frame, message, details, receipt_id) - handle notification from server
  that a request failed and that the connection should be closed

Constant Summary collapse

SUPPORTED_VERSIONS =
["1.0", "1.1", "1.2"]
ACK_SETTINGS =
{
  "1.0" => ["auto", "client"],
  "1.1" => ["auto", "client", "client-individual"],
  "1.2" => ["auto", "client", "client-individual"]
}
SERVER_COMMANDS =
[:connected, :message, :receipt, :error]
MIN_SEND_HEARTBEAT =
5000

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Client

Create STOMP client

Parameters:

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :host (String)

    to which client wishes to connect; if not using virtual hosts, recommended setting is the host name that the socket in use was connected against, or any name of client’s choosing; defaults to “stomp”

  • :receipt (Boolean)

    enabled for all requests except connect; disabled by default but can still enable on individual requests

  • :auto_json (Boolean)

    encode/decode “application/json” content-type

  • :min_send_interval (Integer)

    in msec that this client can guarantee; defaults to MIN_SEND_HEARTBEAT



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/stomp_out/client.rb', line 74

def initialize(options = {})
  @options = options
  @host = @options[:host] || "stomp"
  @parser = StompOut::Parser.new
  @ack_id = 0
  @message_ids = {} # ack ID is key
  @subscribe_id = 0
  @subscribes = {} # destination is key
  @transaction_id = 0
  @transaction_ids = []
  @receipt = options[:receipt]
  @receipt_id = 0
  @receipted_frames = {} # receipt-id is key
  @connected = false
end

Instance Attribute Details

#heartbeatObject (readonly)

Heartbeat

heartbeat generator and monitor



62
63
64
# File 'lib/stomp_out/client.rb', line 62

def heartbeat
  @heartbeat
end

#hostObject (readonly)

String

host to which client is connecting



59
60
61
# File 'lib/stomp_out/client.rb', line 59

def host
  @host
end

#server_nameObject (readonly)

String

name assigned to server



56
57
58
# File 'lib/stomp_out/client.rb', line 56

def server_name
  @server_name
end

#session_idObject (readonly)

String

session_id assigned to session



53
54
55
# File 'lib/stomp_out/client.rb', line 53

def session_id
  @session_id
end

#versionObject (readonly)

String

version of STOMP chosen for session



50
51
52
# File 'lib/stomp_out/client.rb', line 50

def version
  @version
end

Instance Method Details

#abort(id, receipt = nil, headers = nil) ⇒ String, NilClass

Roll back a transaction

Parameters:

  • id (String)

    uniquely identifying transaction

  • receipt (Boolean, NilClass) (defaults to: nil)

    enabled (or’d with global setting)

  • headers (Hash, NilClass) (defaults to: nil)

    that are application specific

Returns:

  • (String, NilClass)

    receipt ID if enabled, otherwise nil

Raises:



410
411
412
413
414
415
416
417
# File 'lib/stomp_out/client.rb', line 410

def abort(id, receipt = nil, headers = nil)
  raise ProtocolError.new("Not connected") unless @connected
  raise ApplicationError.new("Transaction #{id} not found") unless @transaction_ids.delete(id.to_s)
  headers ||= {}
  headers["transaction"] = id.to_s
  frame = send_frame("ABORT", headers, body = nil, content_type = nil, receipt)
  frame.headers["receipt"]
end

#ack(ack_id, receipt = nil, transaction_id = nil, headers = nil) ⇒ String, NilClass

Acknowledge consumption of a message from a subscription

Parameters:

  • ack_id (String)

    identifying message being acknowledged

  • receipt (Boolean, NilClass) (defaults to: nil)

    enabled (or’d with global setting)

  • transaction_id (String, NilClass) (defaults to: nil)

    for transaction into which this command is to be included; defaults to no transaction

  • headers (Hash, NilClass) (defaults to: nil)

    that are application specific

Returns:

  • (String, NilClass)

    receipt ID if enabled, otherwise nil

Raises:



327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/stomp_out/client.rb', line 327

def ack(ack_id, receipt = nil, transaction_id = nil, headers = nil)
  raise ProtocolError.new("Not connected") unless @connected
  message_id = @message_ids.delete(ack_id)
  headers ||= {}
  if @version == "1.0"
    raise ApplicationError.new("No message was received with ack #{ack_id}") if message_id.nil?
    headers["message-id"] = message_id
    frame = send_frame("ACK", headers, body = nil, content_type = nil, receipt, transaction_id)
  else
    headers["id"] = ack_id.to_s
    frame = send_frame("ACK", headers, body = nil, content_type = nil, receipt, transaction_id)
  end
  frame.headers["receipt"]
end

#begin(receipt = nil, headers = nil) ⇒ Array<String>

Start a transaction

Parameters:

  • receipt (Boolean, NilClass) (defaults to: nil)

    enabled (or’d with global setting)

  • headers (Hash, NilClass) (defaults to: nil)

    that are application specific

Returns:

  • (Array<String>)

    transaction ID and receipt ID if receipt enabled

Raises:



371
372
373
374
375
376
377
378
379
# File 'lib/stomp_out/client.rb', line 371

def begin(receipt = nil, headers = nil)
  raise ProtocolError.new("Not connected") unless @connected
  id = (@transaction_id += 1).to_s
  headers ||= {}
  headers["transaction"] = id.to_s
  frame = send_frame("BEGIN", headers, body = nil, content_type = nil, receipt)
  @transaction_ids << id
  [id, frame.headers["receipt"]]
end

#commit(id, receipt = nil, headers = nil) ⇒ String, NilClass

Commit a transaction

Parameters:

  • transaction_id (String)

    uniquely identifying transaction

  • receipt (Boolean, NilClass) (defaults to: nil)

    enabled (or’d with global setting)

  • headers (Hash, NilClass) (defaults to: nil)

    that are application specific

Returns:

  • (String, NilClass)

    receipt ID if enabled, otherwise nil

Raises:



391
392
393
394
395
396
397
398
# File 'lib/stomp_out/client.rb', line 391

def commit(id, receipt = nil, headers = nil)
  raise ProtocolError.new("Not connected") unless @connected
  raise ApplicationError.new("Transaction #{id} not found") unless @transaction_ids.delete(id.to_s)
  headers ||= {}
  headers["transaction"] = id.to_s
  frame = send_frame("COMMIT", headers, body = nil, content_type = nil, receipt)
  frame.headers["receipt"]
end

#connect(heartbeat = nil, login = nil, passcode = nil, headers = nil) ⇒ TrueClass

Connect to server

Parameters:

  • heartbeat (Integer, NilClass) (defaults to: nil)

    rate in milliseconds that is desired; defaults to no heartbeat; not usable unless eventmachine gem available

  • login (String, NilClass) (defaults to: nil)

    name for authentication with server; defaults to no authentication, although this may not be acceptable to server

  • passcode (String, NilClass) (defaults to: nil)

    for authentication

  • headers (Hash, NilClass) (defaults to: nil)

    that are application specific

Returns:

  • (TrueClass)

    always true

Raises:



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/stomp_out/client.rb', line 229

def connect(heartbeat = nil,  = nil, passcode = nil, headers = nil)
  raise ProtocolError, "Already connected" if @connected
  headers ||= {}
  headers["accept-version"] = SUPPORTED_VERSIONS.join(",")
  headers["host"] = @host
  if heartbeat
    raise ApplicationError.new("Heartbeat not usable without eventmachine") unless Heartbeat.usable?
    headers["heart-beat"] = "#{@options[:min_send_interval] || MIN_SEND_HEARTBEAT},#{heartbeat}"
  end
  if 
    headers["login"] = 
    headers["passcode"] = passcode
  end
  send_frame("CONNECT", headers)
  true
end

#connected?Boolean

Determine whether connected to STOMP server

Returns:

  • (Boolean)

    true if connected, otherwise false



107
108
109
# File 'lib/stomp_out/client.rb', line 107

def connected?
  !!@connected
end

#disconnect(receipt = nil, headers = nil) ⇒ String, NilClass

Disconnect from the server Client is expected to close its connection after calling this function If receipt is requested, it may not be received before frame is reset

Parameters:

  • receipt (Boolean, NilClass) (defaults to: nil)

    enabled (or’d with global setting)

  • headers (Hash, NilClass) (defaults to: nil)

    that are application specific

Returns:

  • (String, NilClass)

    receipt ID if enabled and connected, otherwise nil

Raises:



429
430
431
432
433
434
435
# File 'lib/stomp_out/client.rb', line 429

def disconnect(receipt = nil, headers = nil)
  raise ProtocolError.new("Not connected") unless @connected
  frame = send_frame("DISCONNECT", headers, body = nil, content_type = nil, receipt)
  @heartbeat.stop if @heartbeat
  @connected = false
  frame.headers["receipt"]
end

#message(destination, message, content_type = nil, receipt = nil, transaction_id = nil, headers = nil) ⇒ String, NilClass

Send message to given destination

Parameters:

  • destination (String)

    for message

  • message (String)

    being sent

  • content_type (String, NilClass) (defaults to: nil)

    of message body in MIME format; optionally JSON-encodes body automatically if “application/json”; defaults to “plain/text”

  • receipt (Boolean, NilClass) (defaults to: nil)

    enabled (or’d with global setting)

  • transaction_id (String, NilClass) (defaults to: nil)

    for transaction into which this command is to be included; defaults to no transaction

  • headers (Hash) (defaults to: nil)

    that are application specific, e.g., “message-id”

Returns:

  • (String, NilClass)

    receipt ID if enabled, otherwise nil

Raises:



261
262
263
264
265
266
267
# File 'lib/stomp_out/client.rb', line 261

def message(destination, message, content_type = nil, receipt = nil, transaction_id = nil, headers = nil)
  raise ProtocolError.new("Not connected") unless @connected
  headers ||= {}
  headers["destination"] = destination
  frame = send_frame("SEND", headers, message, content_type, receipt, transaction_id)
  frame.headers["receipt"]
end

#nack(ack_id, receipt = nil, transaction_id = nil, headers = nil) ⇒ String, NilClass

Tell the server that a message was not consumed

Parameters:

  • ack_id (String)

    identifying message being negatively acknowledged

  • receipt (Boolean, NilClass) (defaults to: nil)

    enabled (or’d with global setting)

  • transaction_id (String, NilClass) (defaults to: nil)

    for transaction into which this command is to be included; defaults to no transaction

  • headers (Hash, NilClass) (defaults to: nil)

    that are application specific

Returns:

  • (String, NilClass)

    receipt ID if enabled, otherwise nil

Raises:



353
354
355
356
357
358
359
360
361
# File 'lib/stomp_out/client.rb', line 353

def nack(ack_id, receipt = nil, transaction_id = nil, headers = nil)
  raise ProtocolError.new("Command 'nack' not supported") if @version == "1.0"
  raise ProtocolError.new("Not connected") unless @connected
  @message_ids.delete(ack_id)
  headers ||= {}
  headers["id"] = ack_id.to_s
  frame = send_frame("NACK", headers, body = nil, content_type = nil, receipt, transaction_id)
  frame.headers["receipt"]
end

#on_connected(frame, session_id, server_name) ⇒ TrueClass

Handle notification that now connected to server

Parameters:

  • frame (Frame)

    received from server

  • session_id (String)

    uniquely identifying the given STOMP session

  • server_name (String, NilClass)

    in form “<name>/<version>” with “/<version>” being optional; nil if not provided by server

Returns:

  • (TrueClass)

    always true



167
168
169
# File 'lib/stomp_out/client.rb', line 167

def on_connected(frame, session_id, server_name)
  raise "Not implemented"
end

#on_error(frame, error, details, receipt_id) ⇒ TrueClass

Handle notification from server that a request failed and that the connection should be closed

Parameters:

  • frame (Frame)

    received from server

  • error (String)

    message

  • details (String, NilClass)

    about the error, e.g., the frame that failed

  • receipt_id (String, NilClass)

    identifying request that failed (Client functions optionally return a receipt_id)

Returns:

  • (TrueClass)

    always true



208
209
210
# File 'lib/stomp_out/client.rb', line 208

def on_error(frame, error, details, receipt_id)
  raise "Not implemented"
end

#on_message(frame, destination, message, content_type, message_id, ack_id) ⇒ TrueClass

Handle message received from server

Parameters:

  • frame (Frame)

    received from server

  • destination (String)

    to which the message was sent

  • message (Object)

    body; if content_type is “application/json” and :auto_json client option specified the message is JSON decoded

  • content_type (String)

    of message in MIME terms, e.g., “text/plain”

  • message_id (String)

    uniquely identifying message

  • ack_id (String, NilClass)

    to be used when acknowledging message to server if acknowledgement enabled

Returns:

  • (TrueClass)

    always true



183
184
185
# File 'lib/stomp_out/client.rb', line 183

def on_message(frame, destination, message, content_type, message_id, ack_id)
  raise "Not implemented"
end

#on_receipt(frame, receipt_id) ⇒ TrueClass

Handle notification that a request was successfully handled by server

Parameters:

  • frame (Frame)

    received from server

  • receipt_id (String)

    identifying request completed (client request functions optionally return a receipt_id)

Returns:

  • (TrueClass)

    always true



194
195
196
# File 'lib/stomp_out/client.rb', line 194

def on_receipt(frame, receipt_id)
  raise "Not implemented"
end

#receive_data(data) ⇒ TrueClass

Process data received over connection from server

Parameters:

  • data (String)

    to be processed

Returns:

  • (TrueClass)

    always true



137
138
139
140
141
142
143
144
# File 'lib/stomp_out/client.rb', line 137

def receive_data(data)
  @parser << data
  process_frames
  @heartbeat.received_data if @heartbeat
  true
rescue StandardError => e
  report_error(e)
end

#report_error(error) ⇒ TrueClass

Report to client that an error was encountered locally Not intended for use by end user of this class

Parameters:

  • error (Exception, String)

    being reported

Returns:

  • (TrueClass)

    always true



117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/stomp_out/client.rb', line 117

def report_error(error)
  details = ""
  if error.is_a?(ProtocolError) || error.is_a?(ApplicationError)
    message = error.message
  elsif error.is_a?(Exception)
    message = "#{error.class}: #{error.message}"
    details = error.backtrace.join("\n") if error.respond_to?(:backtrace)
  else
    message = error.to_s
  end
  frame = Frame.new("ERROR", {"message" => message}, details)
  on_error(frame, message, details, receipt_id = nil)
  true
end

#send_data(data) ⇒ TrueClass

Send data over connection to server

Parameters:

  • data (String)

    that is STOMP encoded

Returns:

  • (TrueClass)

    always true



155
156
157
# File 'lib/stomp_out/client.rb', line 155

def send_data(data)
  raise "Not implemented"
end

#subscribe(destination, ack = nil, receipt = nil, headers = nil) ⇒ String, NilClass

Register to listen to a given destination

Parameters:

  • destination (String)

    of interest

  • ack (String, NilClass) (defaults to: nil)

    setting: “auto”, “client”, or “client-individual”; defaults to “auto”

  • receipt (Boolean, NilClass) (defaults to: nil)

    enabled (or’d with global setting)

  • headers (Hash, NilClass) (defaults to: nil)

    that are application specific

Returns:

  • (String, NilClass)

    receipt ID if enabled, otherwise nil

Raises:



281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/stomp_out/client.rb', line 281

def subscribe(destination, ack = nil, receipt = nil, headers = nil)
  raise ProtocolError.new("Not connected") unless @connected
  raise ApplicationError.new("Already subscribed to '#{destination}'") if @subscribes[destination]
  raise ProtocolError.new("Invalid 'ack' setting") if ack && !ACK_SETTINGS[@version].include?(ack)
  @subscribes[destination] = {:id => (@subscribe_id += 1).to_s, :ack => ack}
  headers ||= {}
  headers["destination"] = destination
  headers["id"] = @subscribe_id.to_s
  headers["ack"] = ack if ack
  frame = send_frame("SUBSCRIBE", headers, body = nil, content_type = nil, receipt)
  frame.headers["receipt"]
end

#subscriptionsArray<String>

List active subscriptions

Returns:

  • (Array<String>)

    subscription destinations



93
94
95
# File 'lib/stomp_out/client.rb', line 93

def subscriptions
  @subscribes.keys
end

#transactionsArray<String>

List active transactions

Returns:

  • (Array<String>)

    transaction IDs



100
101
102
# File 'lib/stomp_out/client.rb', line 100

def transactions
  @transaction_ids
end

#unsubscribe(destination, receipt = nil, headers = nil) ⇒ String, NilClass

Remove an existing subscription

Parameters:

  • destination (String)

    no longer of interest

  • receipt (Boolean, NilClass) (defaults to: nil)

    enabled (or’d with global setting)

  • headers (Hash, NilClass) (defaults to: nil)

    that are application specific

Returns:

  • (String, NilClass)

    receipt ID if enabled, otherwise nil

Raises:



304
305
306
307
308
309
310
311
312
313
# File 'lib/stomp_out/client.rb', line 304

def unsubscribe(destination, receipt = nil, headers = nil)
  raise ProtocolError.new("Not connected") unless @connected
  subscribe = @subscribes.delete(destination)
  raise ApplicationError.new("Subscription to '#{destination}' not found") if subscribe.nil?
  headers ||= {}
  headers["id"] = subscribe[:id]
  headers["destination"] = destination if @version == "1.0"
  frame = send_frame("UNSUBSCRIBE", headers, body = nil, content_type = nil, receipt)
  frame.headers["receipt"]
end