Class: OpenC3::MqttInterface
- Defined in:
- lib/openc3/interfaces/mqtt_interface.rb
Overview
Base class for interfaces that send and receive messages over MQTT
Constant Summary
Constants included from Api
Api::DELAY_METRICS, Api::DURATION_METRICS, Api::SUBSCRIPTION_DELIMITER, Api::SUM_METRICS
Constants included from ApiShared
ApiShared::DEFAULT_TLM_POLLING_RATE
Constants included from Extract
Extract::SCANNING_REGULAR_EXPRESSION
Instance Attribute Summary
Attributes inherited from Interface
#auto_reconnect, #bytes_read, #bytes_written, #cmd_routers, #cmd_target_names, #config_params, #connect_on_startup, #disable_disconnect, #interfaces, #name, #num_clients, #options, #packet_log_writer_pairs, #protocol_info, #read_count, #read_protocols, #read_queue_size, #read_raw_data, #read_raw_data_time, #reconnect_delay, #routers, #scheduler, #secrets, #state, #stored_packet_log_writer_pairs, #stream_log_pair, #target_names, #tlm_target_names, #write_count, #write_protocols, #write_queue_size, #written_raw_data, #written_raw_data_time
Instance Method Summary collapse
-
#connect ⇒ Object
Connects the interface to its target(s).
-
#connected? ⇒ Boolean
Whether the active ports (read and/or write) have created sockets.
-
#disconnect ⇒ Object
Disconnects the interface from its target(s).
-
#initialize(hostname, port = 1883, ssl = false) ⇒ MqttInterface
constructor
A new instance of MqttInterface.
- #read ⇒ Object
-
#read_interface ⇒ Object
Reads from the socket if the read_port is defined.
-
#set_option(option_name, option_values) ⇒ Object
Supported Options USERNAME - Username for Mqtt Server PASSWORD - Password for Mqtt Server CERT - Public Key for Client Cert Auth KEY - Private Key for Client Cert Auth CA_FILE - Certificate Authority for Client Cert Auth (see Interface#set_option).
- #write(packet) ⇒ Object
-
#write_interface(data, extra = nil) ⇒ Object
Writes to the socket.
Methods inherited from Interface
#_write, #add_protocol, #as_json, #convert_data_to_packet, #convert_packet_to_data, #copy_to, #interface_cmd, #protocol_cmd, #read_allowed?, #read_interface_base, #start_raw_logging, #stop_raw_logging, #write_allowed?, #write_interface_base, #write_raw, #write_raw_allowed?
Methods included from Api
#_build_cmd_output_string, #_cmd_implementation, #_get_item, #_limits_group, #_set_tlm_process_args, #_tlm_process_args, #_validate_tlm_type, #build_command, #cmd, #cmd_no_checks, #cmd_no_hazardous_check, #cmd_no_range_check, #cmd_raw, #cmd_raw_no_checks, #cmd_raw_no_hazardous_check, #cmd_raw_no_range_check, #config_tool_names, #connect_interface, #connect_router, #delete_config, #disable_limits, #disable_limits_group, #disconnect_interface, #disconnect_router, #enable_limits, #enable_limits_group, #get_all_command_names, #get_all_commands, #get_all_interface_info, #get_all_router_info, #get_all_settings, #get_all_target_info, #get_all_telemetry, #get_all_telemetry_names, #get_cmd_buffer, #get_cmd_cnt, #get_cmd_cnts, #get_cmd_hazardous, #get_cmd_time, #get_cmd_value, #get_command, #get_interface, #get_interface_names, #get_item, #get_limits, #get_limits_events, #get_limits_groups, #get_limits_set, #get_limits_sets, #get_metrics, #get_out_of_limits, #get_overall_limits_state, #get_overrides, #get_packet_derived_items, #get_packets, #get_parameter, #get_router, #get_router_names, #get_setting, #get_settings, #get_target, #get_target_interfaces, #get_target_names, #get_telemetry, #get_tlm_buffer, #get_tlm_cnt, #get_tlm_cnts, #get_tlm_packet, #get_tlm_values, #inject_tlm, #interface_cmd, #interface_protocol_cmd, #limits_enabled?, #list_configs, #list_settings, #load_config, #map_target_to_interface, #normalize_tlm, #offline_access_needed, #override_tlm, #router_cmd, #router_protocol_cmd, #save_config, #send_raw, #set_limits, #set_limits_set, #set_offline_access, #set_setting, #set_tlm, #start_raw_logging_interface, #start_raw_logging_router, #stash_all, #stash_delete, #stash_get, #stash_keys, #stash_set, #stop_raw_logging_interface, #stop_raw_logging_router, #subscribe_packets, #tlm, #tlm_formatted, #tlm_raw, #tlm_variable, #tlm_with_units
Constructor Details
#initialize(hostname, port = 1883, ssl = false) ⇒ MqttInterface
Returns a new instance of MqttInterface.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 95 def initialize(hostname, port = 1883, ssl = false) super() @hostname = hostname @port = Integer(port) @ssl = ConfigParser.handle_true_false(ssl) @username = nil @password = nil @cert = nil @key = nil @ca_file = nil @write_topics = [] @read_topics = [] # Build list of packets by topic @read_packets_by_topic = {} System.telemetry.all.each do |target_name, target_packets| target_packets.each do |packet_name, packet| topics = packet.['TOPIC'] topics = packet.['TOPICS'] unless topics if topics topics.each do |topic| @read_packets_by_topic[topic] = packet end end end end end |
Instance Method Details
#connect ⇒ Object
Connects the interface to its target(s)
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 125 def connect @write_topics = [] @read_topics = [] @client = MQTT::Client.new @client.host = @hostname @client.port = @port @client.ssl = @ssl @client.username = @username if @username @client.password = @password if @password @client.cert = @cert if @cert @client.key = @key if @key @client.ca_file = @ca_file.path if @ca_file @client.connect @read_packets_by_topic.each do |topic, _| Logger.info "#{@name}: Subscribing to #{topic}" @client.subscribe(topic) end super() end |
#connected? ⇒ Boolean
Returns Whether the active ports (read and/or write) have created sockets. Since UDP is connectionless, creation of the sockets is used to determine connection.
148 149 150 151 152 153 154 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 148 def connected? if @client return @client.connected? else return false end end |
#disconnect ⇒ Object
Disconnects the interface from its target(s)
157 158 159 160 161 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 157 def disconnect @client.disconnect @client = nil super() end |
#read ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 163 def read topic = @read_topics.shift packet = super() return nil unless packet identified_packet = @read_packets_by_topic[topic] if identified_packet identified_packet = identified_packet.dup identified_packet.buffer = packet.buffer packet = identified_packet end packet.received_time = nil return packet end |
#read_interface ⇒ Object
Reads from the socket if the read_port is defined
191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 191 def read_interface topic, data = @client.get if data.nil? or data.length <= 0 Logger.info "#{@name}: read returned nil" if data.nil? Logger.info "#{@name}: read returned 0 bytes" if not data.nil? and data.length <= 0 return nil end @read_topics << topic extra = nil read_interface_base(data, extra) return data, extra rescue IOError # Disconnected return nil end |
#set_option(option_name, option_values) ⇒ Object
Supported Options USERNAME - Username for Mqtt Server PASSWORD - Password for Mqtt Server CERT - Public Key for Client Cert Auth KEY - Private Key for Client Cert Auth CA_FILE - Certificate Authority for Client Cert Auth (see Interface#set_option)
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 222 def set_option(option_name, option_values) super(option_name, option_values) case option_name.upcase when 'USERNAME' @username = option_values[0] when 'PASSWORD' @password = option_values[0] when 'CERT' @cert = option_values[0] when 'KEY' @key = option_values[0] when 'CA_FILE' # CA_FILE must be given as a file @ca_file = Tempfile.new('ca_file') @ca_file.write(option_values[0]) @ca_file.close end end |
#write(packet) ⇒ Object
177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 177 def write(packet) topics = packet.['TOPIC'] topics = packet.['TOPICS'] unless topics if topics topics.each do |topic| @write_topics << topic super(packet) end else raise "Command packet #{packet.target_name} #{packet.packet_name} requires a META TOPIC or TOPICS" end end |
#write_interface(data, extra = nil) ⇒ Object
Writes to the socket
208 209 210 211 212 213 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 208 def write_interface(data, extra = nil) write_interface_base(data, extra) topic = @write_topics.shift @client.publish(topic, data) return data, extra end |