Class: MQTT::Client

Inherits:
Object show all
Defined in:
lib/openc3/interfaces/mqtt_interface.rb

Instance Method Summary collapse

Instance Method Details

#disconnect(send_msg = true) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 67

def disconnect(send_msg = true)
  # Stop reading packets from the socket first
  @read_thread.kill if @read_thread && @read_thread.alive?
  @read_thread = nil

  @read_queue << nil # Patch for COSMOS

  # Close the socket if it is open
  if connected?
    if send_msg
      packet = MQTT::Packet::Disconnect.new
      send_packet(packet)
    end
    @socket.close unless @socket.nil?
    @socket = nil
  end
end

#get(topic = nil, options = {}) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 31

def get(topic = nil, options = {})
  if block_given?
    get_packet(topic) do |packet|
      yield(packet.topic, packet.payload) unless packet.retain && options[:omit_retained]
    end
  else
    loop do
      # Wait for one packet to be available
      packet = get_packet(topic)
      return nil unless packet # Patch for COSMOS
      return packet.topic, packet.payload unless packet.retain && options[:omit_retained]
    end
  end
end

#get_packet(topic = nil) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 46

def get_packet(topic = nil)
  # Subscribe to a topic, if an argument is given
  subscribe(topic) unless topic.nil?

  if block_given?
    # Loop forever!
    loop do
      packet = @read_queue.pop
      return nil unless packet # Patch for COSMOS
      yield(packet)
      puback_packet(packet) if packet.qos > 0
    end
  else
    # Wait for one packet to be available
    packet = @read_queue.pop
    return nil unless packet # Patch for COSMOS
    puback_packet(packet) if packet.qos > 0
    return packet
  end
end