Class: StompOut::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/stomp_out/server.rb

Overview

Abstract base class for STOMP server for use with an existing client connection, such as a WebSocket. Derived classes are responsible for supplying the following functions:

on_connect(frame, login, passcode, host, session_id) - handle connect request from
  client including any authentication
on_message(frame, destination, message, content_type) - handle delivery of message
  from client to given destination
on_subscribe(frame, id, destination, ack_setting) - subscribe client to messages
  from given destination
on_unsubscribe(frame, id, destination) - remove existing subscription
on_ack(frame, ack_id) - handle acknowledgement from client that message has
  been successfully processed
on_nack(frame, ack_id) - handle negative acknowledgement from client for message
on_error(frame, error) - handle notification from server that client or server
  request failed and that connection should be closed
on_disconnect(frame, reason) - handle request from client to close connection

The above functions should raise ApplicationError for requests that violate their server constraints.

Constant Summary collapse

SUPPORTED_VERSIONS =
["1.0", "1.1", "1.2"]
ACK_SETTINGS =
{
  "1.0" => ["auto", "client"],
  "1.1" => ["auto", "client", "client-individual"],
  "1.2" => ["auto", "client", "client-individual"]
}
CLIENT_COMMANDS =
[:stomp, :connect, :send, :subscribe, :unsubscribe, :ack, :nack, :begin, :commit, :abort, :disconnect]
TRANSACTIONAL_COMMANDS =
[:send, :ack, :nack, :begin, :commit, :abort]
MIN_SEND_HEARTBEAT =
5000
DESIRED_RECEIVE_HEARTBEAT =
60000
@@last_session_id =
Integer

value of last generated session ID for class

0

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Server

Create STOMP server

Parameters:

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :name (String)

    of server using STOMP that is to be sent to client

  • :version (String)

    of server using STOMP

  • :min_send_interval (Integer)

    in msec that server is willing to guarantee; defaults to MIN_SEND_HEARTBEAT

  • :desired_receive_interval (Integer)

    in msec for client to send heartbeats; defaults to DESIRED_RECEIVE_HEARTBEAT



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/stomp_out/server.rb', line 81

def initialize(options = {})
  @options = options
  @ack_id = 0
  @ack_ids = {} # message-id is key
  @message_id = 0
  @subscribe_id = 0
  @subscribes = {} # destination is key
  @server_name = options[:name] + (options[:version] ? "/#{options[:version]}" : "") if options[:name]
  @parser = StompOut::Parser.new
  @transactions = {}
  @connected = false
end

Instance Attribute Details

#heartbeatObject (readonly)

Heartbeat

heartbeat generator and monitor



71
72
73
# File 'lib/stomp_out/server.rb', line 71

def heartbeat
  @heartbeat
end

#server_nameObject (readonly)

String

name assigned to server



68
69
70
# File 'lib/stomp_out/server.rb', line 68

def server_name
  @server_name
end

#session_idObject (readonly)

String

session_id assigned to session



65
66
67
# File 'lib/stomp_out/server.rb', line 65

def session_id
  @session_id
end

#versionObject (readonly)

String

version of STOMP chosen for session



62
63
64
# File 'lib/stomp_out/server.rb', line 62

def version
  @version
end

Instance Method Details

#connected?Boolean

Determine whether connected to STOMP server

Returns:

  • (Boolean)

    true if connected, otherwise false



109
110
111
# File 'lib/stomp_out/server.rb', line 109

def connected?
  !!@connected
end

#disconnectTrueClass

Stop service

Returns:

  • (TrueClass)

    always true



116
117
118
119
120
121
122
# File 'lib/stomp_out/server.rb', line 116

def disconnect
  if @connected
    @heartbeat.stop if @heartbeat
    @connected = false
  end
  true
end

#message(headers, body) ⇒ Array

Send message from a subscribed destination to client using MESSAGE frame

  • must set “destination” header with the destination to which the message was sent; should be identical to “destination” of SEND frame if sent using STOMP

  • must set “message-id” header uniquely identifying message

  • must set “subscription” header matching identifier of subscription receiving the message (only 1.1, 1.2)

  • must set “ack” header identifying ack/nack uniquely for this connection if subscription specified “ack” header with mode “client” or “client-individual” (only 1.2)

  • must set the frame body to the body of the message

  • should set “content-length” and “content-type” headers if there is a body

  • may set other application-specific headers

Parameters:

  • headers (Hash)

    for message per requirements above but with “message-id” defaulting to generated ID and “ack” defaulting to generated ID if not specified

  • body (String)

    of message

Returns:

  • (Array)

    message ID and ack ID; latter is nil if ack is in “auto” mode

Raises:

  • (ProtocolError)

    not connected

  • (ApplicationError)

    subscription not found, subscription does not match destination, non-unique message ID



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/stomp_out/server.rb', line 273

def message(headers, body)
  raise ProtocolError.new("Not connected") unless @connected
  frame = Frame.new(nil, (headers && headers.dup) || {})
  destination, subscribe_id = frame.require(@version, "destination" => [], "subscription" => ["1.0"])
  message_id = frame.headers["message-id"] ||= (@message_id += 1).to_s

  ack_id = nil
  if (subscribe = @subscribes[destination])
    if subscribe[:id] != subscribe_id && @version != "1.0"
      raise ApplicationError.new("Subscription does not match destination")
    end
    if subscribe[:ack] != "auto"
      # Create ack ID if there is none so that user of this server can rely
      # on always receiving an ack ID (as opposed to a message ID) on ack/nack
      # independent of STOMP version in use
      if @version < "1.2"
        ack_id = frame.headers.delete("ack") || (@ack_id += 1).to_s
        @ack_ids[message_id] = (@ack_ids[message_id] || []) << ack_id
      else
        ack_id = frame.headers["ack"] ||= (@ack_id += 1).to_s
      end
    end
  else
    raise ApplicationError.new("Subscription not found")
  end

  send_frame("MESSAGE", frame.headers, body)
  [message_id, ack_id]
end

#on_ack(frame, id) ⇒ TrueClass

Handle acknowledgement from client that message has been successfully processed

Parameters:

  • frame (Frame)

    received from client

  • id (String)

    for acknowledgement assigned to previously sent message

Returns:

  • (TrueClass)

    always true



214
215
216
# File 'lib/stomp_out/server.rb', line 214

def on_ack(frame, id)
  raise "Not implemented"
end

#on_connect(frame, login, passcode, host, session_id) ⇒ Boolean, String

Handle connect request from client including any authentication

Parameters:

  • frame (Frame)

    received from client

  • login (String, NilClass)

    name for authentication

  • passcode (String, NilClass)

    for authentication

  • host (String)

    to which client wishes to connect; this could be a virtual host or anything the application requires, or it may be arbitrary

  • session_id (String)

    default for uniquely identifying the given STOMP session; can be overridden with return value

Returns:

  • (Boolean, String)

    true or a server assigned session ID string if connection accepted, otherwise false



164
165
166
# File 'lib/stomp_out/server.rb', line 164

def on_connect(frame, , passcode, host, session_id)
  raise "Not implemented"
end

#on_disconnect(frame, reason) ⇒ TrueClass

Handle request from client to close connection

Parameters:

  • frame (Frame)

    received from client

  • reason (String)

    for disconnect

Returns:

  • (TrueClass)

    always true



245
246
247
# File 'lib/stomp_out/server.rb', line 245

def on_disconnect(frame, reason)
  raise "Not implemented"
end

#on_error(frame, error) ⇒ TrueClass

Handle notification that a client or server request failed and that the connection should be closed

Parameters:

Returns:

  • (TrueClass)

    always true



235
236
237
# File 'lib/stomp_out/server.rb', line 235

def on_error(frame, error)
  raise "Not implemented"
end

#on_message(frame, destination, message, content_type) ⇒ TrueClass

Handle delivery of message from client to given destination

Parameters:

  • frame (Frame)

    received from client

  • destination (String)

    for message with format being application specific

  • message (Object)

    body

  • content_type (String)

    of message in MIME terms, e.g., “text/plain”

Returns:

  • (TrueClass)

    always true

Raises:



178
179
180
# File 'lib/stomp_out/server.rb', line 178

def on_message(frame, destination, message, content_type)
  raise "Not implemented"
end

#on_nack(frame, id) ⇒ TrueClass

Handle negative acknowledgement from client for message

Parameters:

  • frame (Frame)

    received from client

  • id (String)

    for acknowledgement assigned to previously sent message

Returns:

  • (TrueClass)

    always true



224
225
226
# File 'lib/stomp_out/server.rb', line 224

def on_nack(frame, id)
  raise "Not implemented"
end

#on_subscribe(frame, id, destination, ack_setting) ⇒ TrueClass

Subscribe client to messages from given destination

Parameters:

  • frame (Frame)

    received from client

  • id (String)

    uniquely identifying subscription within given session

  • destination (String)

    from which client wishes to receive messages

  • ack_setting (String)

    for how client wishes to handle acknowledgements: “auto”, “client”, or “client-individual”

Returns:

  • (TrueClass)

    always true

Raises:



193
194
195
# File 'lib/stomp_out/server.rb', line 193

def on_subscribe(frame, id, destination, ack_setting)
  raise "Not implemented"
end

#on_unsubscribe(frame, id, destination) ⇒ TrueClass

Remove existing subscription

Parameters:

  • frame (Frame)

    received from client

  • id (String)

    of an existing subscription

  • destination (String)

    for subscription

Returns:

  • (TrueClass)

    always true



204
205
206
# File 'lib/stomp_out/server.rb', line 204

def on_unsubscribe(frame, id, destination)
  raise "Not implemented"
end

#receive_data(data) ⇒ TrueClass

Process data received over connection from client

Parameters:

  • data (String)

    to be processed

Returns:

  • (TrueClass)

    always true



129
130
131
132
133
134
135
136
# File 'lib/stomp_out/server.rb', line 129

def receive_data(data)
  @parser << data
  process_frames
  @heartbeat.received_data if @heartbeat
  true
rescue StandardError => e
  error(e)
end

#report_error(error) ⇒ TrueClass

Report to server that an error was encountered locally Not intended for use by end user of this class

Parameters:

  • error (String)

    being reported

Returns:

  • (TrueClass)

    always true



100
101
102
103
104
# File 'lib/stomp_out/server.rb', line 100

def report_error(error)
  frame = Frame.new("ERROR", {"message" => error})
  on_error(frame, error)
  true
end

#send_data(data) ⇒ TrueClass

Send data over connection to client

Parameters:

  • data (String)

    that is STOMP encoded

Returns:

  • (TrueClass)

    always true



147
148
149
# File 'lib/stomp_out/server.rb', line 147

def send_data(data)
  raise "Not implemented"
end