Class: OpenC3::QueueProcessor

Inherits:
Object
  • Object
show all
Includes:
Api
Defined in:
lib/openc3/microservices/queue_microservice.rb

Overview

The queue processor runs in a single thread and processes commands via cmd_api.

Constant Summary

Constants included from Api

Api::DELAY_METRICS, Api::DURATION_METRICS, Api::SUBSCRIPTION_DELIMITER, Api::SUM_METRICS

Constants included from ApiShared

ApiShared::DEFAULT_TLM_POLLING_RATE

Constants included from Extract

Extract::SCANNING_REGULAR_EXPRESSION

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Api

#_cmd_implementation, #_extract_target_command_names, #_extract_target_command_parameter_names, #_extract_target_packet_item_names, #_extract_target_packet_names, #_get_and_set_cmd, #_get_item, #_limits_group, #_set_tlm_process_args, #_tlm_process_args, #_validate_tlm_type, #build_cmd, #cmd, #cmd_no_checks, #cmd_no_hazardous_check, #cmd_no_range_check, #cmd_raw, #cmd_raw_no_checks, #cmd_raw_no_hazardous_check, #cmd_raw_no_range_check, #config_tool_names, #connect_interface, #connect_router, #delete_config, #disable_cmd, #disable_limits, #disable_limits_group, #disconnect_interface, #disconnect_router, #enable_cmd, #enable_limits, #enable_limits_group, #get_all_cmd_names, #get_all_cmds, #get_all_interface_info, #get_all_router_info, #get_all_settings, #get_all_tlm, #get_all_tlm_item_names, #get_all_tlm_names, #get_cmd, #get_cmd_buffer, #get_cmd_cnt, #get_cmd_cnts, #get_cmd_hazardous, #get_cmd_time, #get_cmd_value, #get_interface, #get_interface_names, #get_item, #get_limits, #get_limits_events, #get_limits_groups, #get_limits_set, #get_limits_sets, #get_metrics, #get_out_of_limits, #get_overall_limits_state, #get_overrides, #get_packet_derived_items, #get_packets, #get_param, #get_router, #get_router_names, #get_setting, #get_settings, #get_target, #get_target_interfaces, #get_target_names, #get_tlm, #get_tlm_available, #get_tlm_buffer, #get_tlm_cnt, #get_tlm_cnts, #get_tlm_packet, #get_tlm_values, #inject_tlm, #interface_cmd, #interface_details, #interface_protocol_cmd, #interface_target_disable, #interface_target_enable, #limits_enabled?, #list_configs, #list_settings, #load_config, #map_target_to_interface, #map_target_to_router, #normalize_tlm, #offline_access_needed, #override_tlm, #router_cmd, #router_details, #router_protocol_cmd, #router_target_disable, #router_target_enable, #save_config, #send_raw, #set_limits, #set_limits_set, #set_offline_access, #set_setting, #set_tlm, #start_raw_logging_interface, #start_raw_logging_router, #stash_all, #stash_delete, #stash_get, #stash_keys, #stash_set, #stop_raw_logging_interface, #stop_raw_logging_router, #subscribe_packets, #tlm, #tlm_formatted, #tlm_raw, #tlm_with_units, #unmap_target_from_interface, #unmap_target_from_router, #update_news, #update_plugin_store

Methods included from CmdLog

#_build_cmd_output_string

Constructor Details

#initialize(name:, state:, logger:, scope:) ⇒ QueueProcessor

Returns a new instance of QueueProcessor.



39
40
41
42
43
44
45
# File 'lib/openc3/microservices/queue_microservice.rb', line 39

def initialize(name:, state:, logger:, scope:)
  @name = name
  @logger = logger
  @scope = scope
  @state = state
  @cancel_thread = false
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



37
38
39
# File 'lib/openc3/microservices/queue_microservice.rb', line 37

def name
  @name
end

#scopeObject (readonly)

Returns the value of attribute scope.



37
38
39
# File 'lib/openc3/microservices/queue_microservice.rb', line 37

def scope
  @scope
end

#stateObject

Returns the value of attribute state.



36
37
38
# File 'lib/openc3/microservices/queue_microservice.rb', line 36

def state
  @state
end

Instance Method Details

#process_queued_commandsObject



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/openc3/microservices/queue_microservice.rb', line 58

def process_queued_commands
  while @state == 'RELEASE'
    begin
      _queue_name, command_data, _timestamp = Store.bzpopmin("#{@scope}:#{@name}", timeout: 0.2)
      if command_data
        command = JSON.parse(command_data)
        # It's important to set queue: false here to avoid infinite recursion when
        # OPENC3_DEFAULT_QUEUE is set because commands would be re-queued to the default queue
        # NOTE: cmd() via script rescues hazardous errors and calls prompt_for_hazardous()
        # but we've overridden it to always return true and go straight to cmd_no_hazardous_check()

        # Support both new format (target_name, cmd_name, cmd_params) and legacy format (command string)
        if command['target_name'] && command['cmd_name']
          # New format: use 3-parameter cmd() method
          if command['cmd_params']
            cmd_params = JSON.parse(command['cmd_params'], allow_nan: true, create_additions: true)
          else
            cmd_params = {}
          end
          validate = command.key?('validate') ? command['validate'] : true
          timeout = command['timeout']
          cmd(command['target_name'], command['cmd_name'], cmd_params, queue: false, validate: validate, timeout: timeout, scope: @scope)
        elsif command['value']
          # Legacy format: use single string parameter for backwards compatibility
          validate = command.key?('validate') ? command['validate'] : true
          timeout = command['timeout']
          cmd(command['value'], queue: false, validate: validate, timeout: timeout, scope: @scope)
        else
          @logger.error "QueueProcessor: Invalid command format, missing required fields"
        end
      end
    rescue StandardError => e
      @logger.error "QueueProcessor failed to process command from queue #{@name}\n#{e.message}"
    end
    break if @cancel_thread
  end
end

#runObject



47
48
49
50
51
52
53
54
55
56
# File 'lib/openc3/microservices/queue_microservice.rb', line 47

def run
  while true
    if @state == 'RELEASE'
      process_queued_commands()
    else
      sleep 0.2
    end
    break if @cancel_thread
  end
end

#shutdownObject



96
97
98
# File 'lib/openc3/microservices/queue_microservice.rb', line 96

def shutdown
  @cancel_thread = true
end