Class: OpenC3::JsonDRb

Inherits:
Object show all
Defined in:
lib/openc3/io/json_drb.rb

Overview

JsonDRb implements the JSON-RPC 2.0 Specification to provide an interface for both internal and external tools to access the OpenC3 server. It provides methods to install an access control list to control access to the API. It also limits the available methods to a known list of allowable API methods.

Constant Summary collapse

MINIMUM_REQUEST_TIME =

Minimum amount of time in seconds to receive the JSON request, process it, and send the response. Requests for less than this amount will be set to the minimum

0.0001
STOP_SERVICE_TIMEOUT =

seconds to wait when stopping the service

10.0
PUMA_THREAD_TIMEOUT =

seconds to wait for the puma threads to die

10.0
SERVER_START_TIMEOUT =

seconds to wait for the server to start

15.0
@@debug =
false

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeJsonDRb

Returns a new instance of JsonDRb.



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/openc3/io/json_drb.rb', line 77

def initialize
  @thread = nil
  # @acl = nil
  @object = nil
  @method_whitelist = nil
  @request_count = 0
  @request_times = []
  @request_times_index = 0
  @request_mutex = Mutex.new
  @server = nil
  @server_mutex = Mutex.new
end

Instance Attribute Details

#method_whitelistArray<String>

Returns List of methods that should be allowed.

Returns:

  • (Array<String>)

    List of methods that should be allowed



71
72
73
# File 'lib/openc3/io/json_drb.rb', line 71

def method_whitelist
  @method_whitelist
end

#objectACL

attr_accessor :acl

Returns:

  • (ACL)

    The access control list



75
76
77
# File 'lib/openc3/io/json_drb.rb', line 75

def object
  @object
end

#request_countInteger

Returns The number of JSON-RPC requests processed.

Returns:

  • (Integer)

    The number of JSON-RPC requests processed



69
70
71
# File 'lib/openc3/io/json_drb.rb', line 69

def request_count
  @request_count
end

#threadThread (readonly)

Returns The server thread listening for incoming requests.

Returns:

  • (Thread)

    The server thread listening for incoming requests



210
211
212
# File 'lib/openc3/io/json_drb.rb', line 210

def thread
  @thread
end

Class Method Details

.debug=(value) ⇒ Object

Parameters:

  • value (Boolean)

    Whether to enable debug messages



243
244
245
# File 'lib/openc3/io/json_drb.rb', line 243

def self.debug=(value)
  @@debug = value
end

.debug?Boolean

Returns Whether debug messages are enabled.

Returns:

  • (Boolean)

    Whether debug messages are enabled



238
239
240
# File 'lib/openc3/io/json_drb.rb', line 238

def self.debug?
  @@debug
end

Instance Method Details

#add_request_time(request_time) ⇒ Object

Adds a request time to the list. A request time consists of the amount of time to receive the request, process it, and send the response. These times are used by the #average_request_time method to calculate an average request time.

Parameters:

  • request_time (Float)

    Time in seconds for the data transmission



218
219
220
221
222
223
224
225
# File 'lib/openc3/io/json_drb.rb', line 218

def add_request_time(request_time)
  @request_mutex.synchronize do
    request_time = MINIMUM_REQUEST_TIME if request_time < MINIMUM_REQUEST_TIME
    @request_times[@request_times_index] = request_time
    @request_times_index += 1
    @request_times_index = 0 if @request_times_index >= 100
  end
end

#average_request_timeFloat

Returns The average time in seconds for a JSON DRb request to be processed and the response sent.

Returns:

  • (Float)

    The average time in seconds for a JSON DRb request to be processed and the response sent.



229
230
231
232
233
234
235
# File 'lib/openc3/io/json_drb.rb', line 229

def average_request_time
  avg = 0
  @request_mutex.synchronize do
    avg = @request_times.mean
  end
  avg
end

#graceful_killObject

Gracefully kill the thread



119
120
121
122
123
124
# File 'lib/openc3/io/json_drb.rb', line 119

def graceful_kill
  @server_mutex.synchronize do
    @server.stop if @server and @server.running
  rescue
  end
end

#num_clientsInteger

Returns the number of connected clients

Returns:

  • (Integer)

    The number of connected clients



92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/openc3/io/json_drb.rb', line 92

def num_clients
  clients = 0
  @server_mutex.synchronize do
    if @server
      # @server.stats() returns a string like: { "backlog": 0, "running": 0 }
      # "running" indicates the number of server threads running, and
      # therefore the number of clients connected.
      stats = @server.stats()
      stats =~ /"running": \d*/
      clients = $&.split(":")[1].to_i
    end
  end
  return clients
end

#process_request(request_data:, request_headers:, start_time:) ⇒ Object

Process the JSON request data, execute the method, and create a response.

Parameters:

  • request_data (String)

    The JSON encoded request

  • request_headers (Hash)

    The requests headers sent with the request

  • start_time (Time)

    The time when the initial request was received

Returns:

  • response_data, error_code [String, Integer/nil] The JSON encoded response and error code



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/openc3/io/json_drb.rb', line 254

def process_request(request_data:, request_headers:, start_time:)
  @request_count += 1
  begin
    request = JsonRpcRequest.from_json(request_data, request_headers)
    response = nil
    error_code = nil
    response_data = nil

    if (@method_whitelist and @method_whitelist.include?(request.method.downcase())) or
       (!@method_whitelist and !JsonRpcRequest::DANGEROUS_METHODS.include?(request.method.downcase()))
      begin
        if request.keyword_params
          result = @object.public_send(request.method.downcase().intern, *request.params, **request.keyword_params)
        else
          result = @object.public_send(request.method.downcase().intern, *request.params)
        end
        if request.id
          response = JsonRpcSuccessResponse.new(result, request.id)
        end
      rescue Exception => e
        # Filter out the framework stack trace (rails, rack, puma etc)
        lines = e.formatted.split("\n")
        i = lines.find_index { |row| row.include?('actionpack') || row.include?('activesupport') }
        Logger.error lines[0...i].join("\n")

        if request.id
          if NoMethodError === e
            error_code = JsonRpcError::ErrorCode::METHOD_NOT_FOUND
            response = JsonRpcErrorResponse.new(
              JsonRpcError.new(error_code, "Method not found", e), request.id
            )
          elsif ArgumentError === e
            error_code = JsonRpcError::ErrorCode::INVALID_PARAMS
            response = JsonRpcErrorResponse.new(
              JsonRpcError.new(error_code, "Invalid params", e), request.id
            )
          elsif AuthError === e
            error_code = JsonRpcError::ErrorCode::AUTH_ERROR
            response = JsonRpcErrorResponse.new(
              JsonRpcError.new(error_code, e.message, e), request.id
            )
          elsif ForbiddenError === e
            error_code = JsonRpcError::ErrorCode::FORBIDDEN_ERROR
            response = JsonRpcErrorResponse.new(
              JsonRpcError.new(error_code, e.message, e), request.id
            )
          elsif HazardousError === e
            error_code = JsonRpcError::ErrorCode::HAZARDOUS_ERROR
            response = JsonRpcErrorResponse.new(
              JsonRpcError.new(error_code, e.message, e), request.id
            )
          else
            error_code = JsonRpcError::ErrorCode::OTHER_ERROR
            response = JsonRpcErrorResponse.new(
              JsonRpcError.new(error_code, e.message, e), request.id
            )
          end
        end
      end
    else
      if request.id
        error_code = JsonRpcError::ErrorCode::OTHER_ERROR
        response = JsonRpcErrorResponse.new(
          JsonRpcError.new(error_code, "Cannot call unauthorized methods"), request.id
        )
      end
    end
    response_data = process_response(response, start_time) if response
    return response_data, error_code
  rescue => e
    error_code = JsonRpcError::ErrorCode::INVALID_REQUEST
    response = JsonRpcErrorResponse.new(JsonRpcError.new(error_code, "Invalid Request", e), nil)
    response_data = process_response(response, start_time)
    return response_data, error_code
  end
end

#start_service(hostname = nil, port = nil, object = nil, max_threads = 1000) ⇒ Object

Parameters:

  • hostname (String) (defaults to: nil)

    The host to start the service on

  • port (Integer) (defaults to: nil)

    The port number to listen for connections

  • object (Object) (defaults to: nil)

    The object to send the DRb requests to.



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/openc3/io/json_drb.rb', line 129

def start_service(hostname = nil, port = nil, object = nil, max_threads = 1000)
  server_started = false
  @server_mutex.synchronize do
    server_started = true if @server
  end
  return if server_started

  if hostname and port and object
    @object = object
    hostname = '127.0.0.1'.freeze if hostname.to_s.upcase == 'LOCALHOST'.freeze

    @thread = Thread.new do
      # Create an http server to accept requests from clients

      server_config = {
        :Host => hostname,
        :Port => port,
        :Silent => true,
        :Verbose => false,
        :Threads => "0:#{max_threads}",
      }

      # The run call will block until the server is stopped.
      Rackup::Handler::Puma.run(JsonDrbRack.new(self), server_config) do |server|
        @server_mutex.synchronize do
          @server = server
        end
      end

      # Wait for all puma threads to stop before trying to close
      # the sockets
      start_time = Time.now
      while true
        puma_threads = false
        Thread.list.each { |thread| puma_threads = true if thread.inspect.match?(/puma/) }
        break if !puma_threads
        break if (Time.now - start_time) > PUMA_THREAD_TIMEOUT

        sleep 0.25
      end

      # Puma doesn't clean up it's own sockets after shutting down,
      # so we'll do that here.
      @server_mutex.synchronize do
        @server.binder.close() if @server
      end

    # The address in use error is pretty typical if an existing
    # server is running so explicitly rescue this
    rescue Errno::EADDRINUSE
      @server = nil
      raise "Error binding to port #{port}.\n" +
            "Either another application is using this port\n" +
            "or the operating system is being slow cleaning up.\n" +
            "Make sure all sockets/streams are closed in all applications,\n" +
            "wait 1 minute and try again."
    # Something else went wrong which is fatal
    rescue => e
      @server = nil
      Logger.error "JsonDRb http server could not be started or unexpectedly died.\n#{e.formatted}"
      OpenC3.handle_fatal_exception(e)
    end

    # Wait for the server to be started in the thread before returning.
    start_time = Time.now
    while ((Time.now - start_time) < SERVER_START_TIMEOUT) and !server_started
      sleep(0.1)
      @server_mutex.synchronize do
        server_started = true if @server and @server.running
      end
    end
    raise "JsonDRb http server could not be started." unless server_started

  elsif hostname or port or object
    raise "0 or 3 parameters must be given"
  else
    # Client - Noop
  end
end

#stop_serviceObject

Stops the DRb service by closing the socket and the processing thread



108
109
110
111
112
113
114
115
116
# File 'lib/openc3/io/json_drb.rb', line 108

def stop_service
  # Kill the server thread
  # parameters are owner, thread, graceful_timeout, timeout_interval, hard_timeout
  OpenC3.kill_thread(self, @thread, STOP_SERVICE_TIMEOUT, 0.1, STOP_SERVICE_TIMEOUT)
  @thread = nil
  @server_mutex.synchronize do
    @server = nil
  end
end