Class: OpenC3::MqttInterface
- Defined in:
- lib/openc3/interfaces/mqtt_interface.rb
Overview
Base class for interfaces that send and receive messages over MQTT
Constant Summary
Constants included from Api
Api::DELAY_METRICS, Api::DURATION_METRICS, Api::SUBSCRIPTION_DELIMITER, Api::SUM_METRICS
Constants included from ApiShared
ApiShared::DEFAULT_TLM_POLLING_RATE
Constants included from Extract
Extract::SCANNING_REGULAR_EXPRESSION
Instance Attribute Summary
Attributes inherited from Interface
#auto_reconnect, #bytes_read, #bytes_written, #cmd_routers, #cmd_target_names, #config_params, #connect_on_startup, #disable_disconnect, #interfaces, #name, #num_clients, #options, #packet_log_writer_pairs, #protocol_info, #read_count, #read_protocols, #read_queue_size, #read_raw_data, #read_raw_data_time, #reconnect_delay, #routers, #scheduler, #secrets, #state, #stored_packet_log_writer_pairs, #stream_log_pair, #target_names, #tlm_target_names, #write_count, #write_protocols, #write_queue_size, #written_raw_data, #written_raw_data_time
Instance Method Summary collapse
-
#connect ⇒ Object
Connects the interface to its target(s).
-
#connected? ⇒ Boolean
Whether the active ports (read and/or write) have created sockets.
- #connection_string ⇒ Object
-
#disconnect ⇒ Object
Disconnects the interface from its target(s).
-
#initialize(hostname, port = 1883, ssl = false) ⇒ MqttInterface
constructor
A new instance of MqttInterface.
- #read ⇒ Object
-
#read_interface ⇒ Object
Reads from the socket if the read_port is defined.
-
#set_option(option_name, option_values) ⇒ Object
Supported Options USERNAME - Username for Mqtt Server PASSWORD - Password for Mqtt Server CERT - Public Key for Client Cert Auth KEY - Private Key for Client Cert Auth CA_FILE - Certificate Authority for Client Cert Auth (see Interface#set_option).
- #write(packet) ⇒ Object
-
#write_interface(data, extra = nil) ⇒ Object
Writes to the socket.
Methods inherited from Interface
#_write, #add_protocol, #as_json, #convert_data_to_packet, #convert_packet_to_data, #copy_to, #interface_cmd, #protocol_cmd, #read_allowed?, #read_interface_base, #start_raw_logging, #stop_raw_logging, #write_allowed?, #write_interface_base, #write_raw, #write_raw_allowed?
Methods included from Api
#_build_cmd_output_string, #_cmd_implementation, #_extract_target_command_names, #_extract_target_command_parameter_names, #_extract_target_packet_item_names, #_extract_target_packet_names, #_get_and_set_cmd, #_get_item, #_limits_group, #_set_tlm_process_args, #_tlm_process_args, #_validate_tlm_type, #build_cmd, #cmd, #cmd_no_checks, #cmd_no_hazardous_check, #cmd_no_range_check, #cmd_raw, #cmd_raw_no_checks, #cmd_raw_no_hazardous_check, #cmd_raw_no_range_check, #config_tool_names, #connect_interface, #connect_router, #delete_config, #disable_cmd, #disable_limits, #disable_limits_group, #disconnect_interface, #disconnect_router, #enable_cmd, #enable_limits, #enable_limits_group, #get_all_cmd_names, #get_all_cmds, #get_all_interface_info, #get_all_router_info, #get_all_settings, #get_all_target_info, #get_all_tlm, #get_all_tlm_names, #get_cmd, #get_cmd_buffer, #get_cmd_cnt, #get_cmd_cnts, #get_cmd_hazardous, #get_cmd_time, #get_cmd_value, #get_interface, #get_interface_names, #get_item, #get_limits, #get_limits_events, #get_limits_groups, #get_limits_set, #get_limits_sets, #get_metrics, #get_out_of_limits, #get_overall_limits_state, #get_overrides, #get_packet_derived_items, #get_packets, #get_param, #get_router, #get_router_names, #get_setting, #get_settings, #get_target, #get_target_interfaces, #get_target_names, #get_tlm, #get_tlm_buffer, #get_tlm_cnt, #get_tlm_cnts, #get_tlm_packet, #get_tlm_values, #inject_tlm, #interface_cmd, #interface_protocol_cmd, #limits_enabled?, #list_configs, #list_settings, #load_config, #map_target_to_interface, #normalize_tlm, #offline_access_needed, #override_tlm, #router_cmd, #router_protocol_cmd, #save_config, #send_raw, #set_limits, #set_limits_set, #set_offline_access, #set_setting, #set_tlm, #start_raw_logging_interface, #start_raw_logging_router, #stash_all, #stash_delete, #stash_get, #stash_keys, #stash_set, #stop_raw_logging_interface, #stop_raw_logging_router, #subscribe_packets, #tlm, #tlm_formatted, #tlm_raw, #tlm_variable, #tlm_with_units
Constructor Details
#initialize(hostname, port = 1883, ssl = false) ⇒ MqttInterface
Returns a new instance of MqttInterface.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 95 def initialize(hostname, port = 1883, ssl = false) super() @hostname = hostname @port = Integer(port) @ssl = ConfigParser.handle_true_false(ssl) @username = nil @password = nil @cert = nil @key = nil @ca_file = nil @write_topics = [] @read_topics = [] # Build list of packets by topic @read_packets_by_topic = {} System.telemetry.all.each do |_target_name, target_packets| target_packets.each do |_packet_name, packet| topics = packet.['TOPIC'] topics = packet.['TOPICS'] unless topics if topics topics.each do |topic| @read_packets_by_topic[topic] = packet end end end end end |
Instance Method Details
#connect ⇒ Object
Connects the interface to its target(s)
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 129 def connect @write_topics = [] @read_topics = [] @client = MQTT::Client.new @client.host = @hostname @client.port = @port @client.ssl = @ssl @client.username = @username if @username @client.password = @password if @password @client.cert = @cert if @cert @client.key = @key if @key @client.ca_file = @ca_file.path if @ca_file @client.connect @read_packets_by_topic.each do |topic, _| Logger.info "#{@name}: Subscribing to #{topic}" @client.subscribe(topic) end super() end |
#connected? ⇒ Boolean
Returns Whether the active ports (read and/or write) have created sockets. Since UDP is connectionless, creation of the sockets is used to determine connection.
152 153 154 155 156 157 158 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 152 def connected? if @client return @client.connected? else return false end end |
#connection_string ⇒ Object
124 125 126 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 124 def connection_string return "#{@hostname}:#{@port} (ssl: #{@ssl})" end |
#disconnect ⇒ Object
Disconnects the interface from its target(s)
161 162 163 164 165 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 161 def disconnect @client.disconnect @client = nil super() end |
#read ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 167 def read packet = super() topic = @read_topics.shift return nil unless packet identified_packet = @read_packets_by_topic[topic] if identified_packet identified_packet = identified_packet.dup identified_packet.buffer = packet.buffer packet = identified_packet end packet.received_time = nil return packet end |
#read_interface ⇒ Object
Reads from the socket if the read_port is defined
197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 197 def read_interface topic, data = @client.get if data.nil? or data.length <= 0 Logger.info "#{@name}: read returned nil" if data.nil? Logger.info "#{@name}: read returned 0 bytes" if not data.nil? and data.length <= 0 return nil end @read_topics << topic extra = nil read_interface_base(data, extra) return data, extra rescue IOError # Disconnected return nil end |
#set_option(option_name, option_values) ⇒ Object
Supported Options USERNAME - Username for Mqtt Server PASSWORD - Password for Mqtt Server CERT - Public Key for Client Cert Auth KEY - Private Key for Client Cert Auth CA_FILE - Certificate Authority for Client Cert Auth (see Interface#set_option)
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 228 def set_option(option_name, option_values) super(option_name, option_values) case option_name.upcase when 'USERNAME' @username = option_values[0] when 'PASSWORD' @password = option_values[0] when 'CERT' @cert = option_values[0] when 'KEY' @key = option_values[0] when 'CA_FILE' # CA_FILE must be given as a file @ca_file = Tempfile.new('ca_file') @ca_file.write(option_values[0]) @ca_file.close end end |
#write(packet) ⇒ Object
181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 181 def write(packet) @write_mutex.synchronize do topics = packet.['TOPIC'] topics = packet.['TOPICS'] unless topics if topics topics.each do |topic| @write_topics << topic super(packet) end else raise "Command packet #{packet.target_name} #{packet.packet_name} requires a META TOPIC or TOPICS" end end end |
#write_interface(data, extra = nil) ⇒ Object
Writes to the socket
214 215 216 217 218 219 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 214 def write_interface(data, extra = nil) write_interface_base(data, extra) topic = @write_topics.shift @client.publish(topic, data) return data, extra end |