Class: OpenC3::MqttInterface

Inherits:
Interface show all
Defined in:
lib/openc3/interfaces/mqtt_interface.rb

Overview

Base class for interfaces that send and receive messages over MQTT

Constant Summary

Constants included from Api

Api::DELAY_METRICS, Api::DURATION_METRICS, Api::SUBSCRIPTION_DELIMITER, Api::SUM_METRICS

Constants included from ApiShared

ApiShared::DEFAULT_TLM_POLLING_RATE

Constants included from Extract

Extract::SCANNING_REGULAR_EXPRESSION

Instance Attribute Summary

Attributes inherited from Interface

#auto_reconnect, #bytes_read, #bytes_written, #cmd_routers, #cmd_target_names, #config_params, #connect_on_startup, #disable_disconnect, #interfaces, #name, #num_clients, #options, #packet_log_writer_pairs, #protocol_info, #read_count, #read_protocols, #read_queue_size, #read_raw_data, #read_raw_data_time, #reconnect_delay, #routers, #scheduler, #secrets, #state, #stored_packet_log_writer_pairs, #stream_log_pair, #target_names, #tlm_target_names, #write_count, #write_protocols, #write_queue_size, #written_raw_data, #written_raw_data_time

Instance Method Summary collapse

Methods inherited from Interface

#_write, #add_protocol, #as_json, #convert_data_to_packet, #convert_packet_to_data, #copy_to, #interface_cmd, #protocol_cmd, #read_allowed?, #read_interface_base, #start_raw_logging, #stop_raw_logging, #write_allowed?, #write_interface_base, #write_raw, #write_raw_allowed?

Methods included from Api

#_build_cmd_output_string, #_cmd_implementation, #_extract_target_command_names, #_extract_target_command_parameter_names, #_extract_target_packet_item_names, #_extract_target_packet_names, #_get_and_set_cmd, #_get_item, #_limits_group, #_set_tlm_process_args, #_tlm_process_args, #_validate_tlm_type, #build_cmd, #cmd, #cmd_no_checks, #cmd_no_hazardous_check, #cmd_no_range_check, #cmd_raw, #cmd_raw_no_checks, #cmd_raw_no_hazardous_check, #cmd_raw_no_range_check, #config_tool_names, #connect_interface, #connect_router, #delete_config, #disable_cmd, #disable_limits, #disable_limits_group, #disconnect_interface, #disconnect_router, #enable_cmd, #enable_limits, #enable_limits_group, #get_all_cmd_names, #get_all_cmds, #get_all_interface_info, #get_all_router_info, #get_all_settings, #get_all_target_info, #get_all_tlm, #get_all_tlm_names, #get_cmd, #get_cmd_buffer, #get_cmd_cnt, #get_cmd_cnts, #get_cmd_hazardous, #get_cmd_time, #get_cmd_value, #get_interface, #get_interface_names, #get_item, #get_limits, #get_limits_events, #get_limits_groups, #get_limits_set, #get_limits_sets, #get_metrics, #get_out_of_limits, #get_overall_limits_state, #get_overrides, #get_packet_derived_items, #get_packets, #get_param, #get_router, #get_router_names, #get_setting, #get_settings, #get_target, #get_target_interfaces, #get_target_names, #get_tlm, #get_tlm_buffer, #get_tlm_cnt, #get_tlm_cnts, #get_tlm_packet, #get_tlm_values, #inject_tlm, #interface_cmd, #interface_protocol_cmd, #limits_enabled?, #list_configs, #list_settings, #load_config, #map_target_to_interface, #normalize_tlm, #offline_access_needed, #override_tlm, #router_cmd, #router_protocol_cmd, #save_config, #send_raw, #set_limits, #set_limits_set, #set_offline_access, #set_setting, #set_tlm, #start_raw_logging_interface, #start_raw_logging_router, #stash_all, #stash_delete, #stash_get, #stash_keys, #stash_set, #stop_raw_logging_interface, #stop_raw_logging_router, #subscribe_packets, #tlm, #tlm_formatted, #tlm_raw, #tlm_variable, #tlm_with_units

Constructor Details

#initialize(hostname, port = 1883, ssl = false) ⇒ MqttInterface

Returns a new instance of MqttInterface.

Parameters:

  • hostname (String)

    MQTT server to connect to

  • port (Integer) (defaults to: 1883)

    MQTT port

  • ssl (Boolean) (defaults to: false)

    Use SSL true/false



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 95

def initialize(hostname, port = 1883, ssl = false)
  super()
  @hostname = hostname
  @port = Integer(port)
  @ssl = ConfigParser.handle_true_false(ssl)
  @username = nil
  @password = nil
  @cert = nil
  @key = nil
  @ca_file = nil

  @write_topics = []
  @read_topics = []

  # Build list of packets by topic
  @read_packets_by_topic = {}
  System.telemetry.all.each do |_target_name, target_packets|
    target_packets.each do |_packet_name, packet|
      topics = packet.meta['TOPIC']
      topics = packet.meta['TOPICS'] unless topics
      if topics
        topics.each do |topic|
          @read_packets_by_topic[topic] = packet
        end
      end
    end
  end
end

Instance Method Details

#connectObject

Connects the interface to its target(s)



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 129

def connect
  @write_topics = []
  @read_topics = []
  @client = MQTT::Client.new
  @client.host = @hostname
  @client.port = @port
  @client.ssl = @ssl
  @client.username = @username if @username
  @client.password = @password if @password
  @client.cert = @cert if @cert
  @client.key = @key if @key
  @client.ca_file = @ca_file.path if @ca_file
  @client.connect
  @read_packets_by_topic.each do |topic, _|
    Logger.info "#{@name}: Subscribing to #{topic}"
    @client.subscribe(topic)
  end
  super()
end

#connected?Boolean

Returns Whether the active ports (read and/or write) have created sockets. Since UDP is connectionless, creation of the sockets is used to determine connection.

Returns:

  • (Boolean)

    Whether the active ports (read and/or write) have created sockets. Since UDP is connectionless, creation of the sockets is used to determine connection.



152
153
154
155
156
157
158
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 152

def connected?
  if @client
    return @client.connected?
  else
    return false
  end
end

#connection_stringObject



124
125
126
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 124

def connection_string
  return "#{@hostname}:#{@port} (ssl: #{@ssl})"
end

#disconnectObject

Disconnects the interface from its target(s)



161
162
163
164
165
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 161

def disconnect
  @client.disconnect
  @client = nil
  super()
end

#readObject



167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 167

def read
  packet = super()
  topic = @read_topics.shift
  return nil unless packet
  identified_packet = @read_packets_by_topic[topic]
  if identified_packet
    identified_packet = identified_packet.dup
    identified_packet.buffer = packet.buffer
    packet = identified_packet
  end
  packet.received_time = nil
  return packet
end

#read_interfaceObject

Reads from the socket if the read_port is defined



197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 197

def read_interface
  topic, data = @client.get
  if data.nil? or data.length <= 0
    Logger.info "#{@name}: read returned nil" if data.nil?
    Logger.info "#{@name}: read returned 0 bytes" if not data.nil? and data.length <= 0
    return nil
  end
  @read_topics << topic
  extra = nil
  read_interface_base(data, extra)
  return data, extra
rescue IOError # Disconnected
  return nil
end

#set_option(option_name, option_values) ⇒ Object

Supported Options USERNAME - Username for Mqtt Server PASSWORD - Password for Mqtt Server CERT - Public Key for Client Cert Auth KEY - Private Key for Client Cert Auth CA_FILE - Certificate Authority for Client Cert Auth (see Interface#set_option)



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 228

def set_option(option_name, option_values)
  super(option_name, option_values)
  case option_name.upcase
  when 'USERNAME'
    @username = option_values[0]
  when 'PASSWORD'
    @password = option_values[0]
  when 'CERT'
    @cert = option_values[0]
  when 'KEY'
    @key = option_values[0]
  when 'CA_FILE'
    # CA_FILE must be given as a file
    @ca_file = Tempfile.new('ca_file')
    @ca_file.write(option_values[0])
    @ca_file.close
  end
end

#write(packet) ⇒ Object



181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 181

def write(packet)
  @write_mutex.synchronize do
    topics = packet.meta['TOPIC']
    topics = packet.meta['TOPICS'] unless topics
    if topics
      topics.each do |topic|
        @write_topics << topic
        super(packet)
      end
    else
      raise "Command packet #{packet.target_name} #{packet.packet_name} requires a META TOPIC or TOPICS"
    end
  end
end

#write_interface(data, extra = nil) ⇒ Object

Writes to the socket

Parameters:

  • data (String)

    Raw packet data



214
215
216
217
218
219
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 214

def write_interface(data, extra = nil)
  write_interface_base(data, extra)
  topic = @write_topics.shift
  @client.publish(topic, data)
  return data, extra
end