Class: OpenC3::MqttStream

Inherits:
Stream show all
Defined in:
lib/openc3/streams/mqtt_stream.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Stream

#read_nonblock

Constructor Details

#initialize(hostname, port = 1883, ssl = false, write_topic = nil, read_topic = nil) ⇒ MqttStream

Returns a new instance of MqttStream.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/openc3/streams/mqtt_stream.rb', line 36

def initialize(hostname, port = 1883, ssl = false, write_topic = nil, read_topic = nil)
  super()

  @hostname = hostname
  @port = Integer(port)
  @ssl = ConfigParser.handle_true_false(ssl)
  @write_topic = ConfigParser.handle_nil(write_topic)
  @read_topic = ConfigParser.handle_nil(read_topic)
  @connected = false

  @username = nil
  @password = nil
  @cert = nil
  @key = nil
  @ca_file = nil

  # Mutex on write is needed to protect from commands coming in from more
  # than one tool
  @write_mutex = Mutex.new
end

Instance Attribute Details

#ca_fileObject

Returns the value of attribute ca_file.



34
35
36
# File 'lib/openc3/streams/mqtt_stream.rb', line 34

def ca_file
  @ca_file
end

#certObject

Returns the value of attribute cert.



32
33
34
# File 'lib/openc3/streams/mqtt_stream.rb', line 32

def cert
  @cert
end

#hostnameObject (readonly)

Returns the value of attribute hostname.



25
26
27
# File 'lib/openc3/streams/mqtt_stream.rb', line 25

def hostname
  @hostname
end

#keyObject

Returns the value of attribute key.



33
34
35
# File 'lib/openc3/streams/mqtt_stream.rb', line 33

def key
  @key
end

#passwordObject

Returns the value of attribute password.



31
32
33
# File 'lib/openc3/streams/mqtt_stream.rb', line 31

def password
  @password
end

#portObject (readonly)

Returns the value of attribute port.



26
27
28
# File 'lib/openc3/streams/mqtt_stream.rb', line 26

def port
  @port
end

#read_topicObject (readonly)

Returns the value of attribute read_topic.



29
30
31
# File 'lib/openc3/streams/mqtt_stream.rb', line 29

def read_topic
  @read_topic
end

#sslObject (readonly)

Returns the value of attribute ssl.



27
28
29
# File 'lib/openc3/streams/mqtt_stream.rb', line 27

def ssl
  @ssl
end

#usernameObject

Returns the value of attribute username.



30
31
32
# File 'lib/openc3/streams/mqtt_stream.rb', line 30

def username
  @username
end

#write_topicObject (readonly)

Returns the value of attribute write_topic.



28
29
30
# File 'lib/openc3/streams/mqtt_stream.rb', line 28

def write_topic
  @write_topic
end

Instance Method Details

#connectObject

Connect the stream



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/openc3/streams/mqtt_stream.rb', line 82

def connect
  @client = MQTT::Client.new
  @client.host = @hostname
  @client.port = @port
  @client.ssl = @ssl
  @client.username = @username if @username
  @client.password = @password if @password
  @client.cert = @cert if @cert
  @client.key = @key if @key
  @client.ca_file = @ca_file.path if @ca_file
  @client.connect
  @client.subscribe(@read_topic) if @read_topic
  @connected = true
end

#connected?Boolean

Returns:

  • (Boolean)


97
98
99
# File 'lib/openc3/streams/mqtt_stream.rb', line 97

def connected?
  @connected
end

#disconnectObject



101
102
103
104
105
106
107
# File 'lib/openc3/streams/mqtt_stream.rb', line 101

def disconnect
  if @connected
    @client.disconnect
    @client = nil
    @connected = false
  end
end

#readString

Returns a binary string of data from the read_topic

Returns:

  • (String)

    Returns a binary string of data from the read_topic



58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/openc3/streams/mqtt_stream.rb', line 58

def read
  raise "Attempt to read from write only stream" unless @read_topic

  # No read mutex is needed because reads happen serially
  _, data = @client.get
  if data.nil? or data.length <= 0
    Logger.info "MqttStream: read returned nil" if data.nil?
    Logger.info "MqttStream: read returned 0 bytes" if not data.nil? and data.length <= 0
    return nil
  end

  return data
end

#write(data) ⇒ Object

Parameters:

  • data (String)

    A binary string of data to write to the write_topic



73
74
75
76
77
78
79
# File 'lib/openc3/streams/mqtt_stream.rb', line 73

def write(data)
  raise "Attempt to write to read only stream" unless @write_topic

  @write_mutex.synchronize do
    @client.publish(@write_topic, data)
  end
end