Class: MQTT::Client
Instance Method Summary collapse
- #disconnect(send_msg = true) ⇒ Object
- #get(topic = nil, options = {}) ⇒ Object
- #get_packet(topic = nil) ⇒ Object
Instance Method Details
#disconnect(send_msg = true) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 67 def disconnect(send_msg = true) # Stop reading packets from the socket first @read_thread.kill if @read_thread && @read_thread.alive? @read_thread = nil @read_queue << nil # Patch for COSMOS # Close the socket if it is open if connected? if send_msg packet = MQTT::Packet::Disconnect.new send_packet(packet) end @socket.close unless @socket.nil? @socket = nil end end |
#get(topic = nil, options = {}) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 31 def get(topic = nil, = {}) if block_given? get_packet(topic) do |packet| yield(packet.topic, packet.payload) unless packet.retain && [:omit_retained] end else loop do # Wait for one packet to be available packet = get_packet(topic) return nil unless packet # Patch for COSMOS return packet.topic, packet.payload unless packet.retain && [:omit_retained] end end end |
#get_packet(topic = nil) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/openc3/interfaces/mqtt_interface.rb', line 46 def get_packet(topic = nil) # Subscribe to a topic, if an argument is given subscribe(topic) unless topic.nil? if block_given? # Loop forever! loop do packet = @read_queue.pop return nil unless packet # Patch for COSMOS yield(packet) puback_packet(packet) if packet.qos > 0 end else # Wait for one packet to be available packet = @read_queue.pop return nil unless packet # Patch for COSMOS puback_packet(packet) if packet.qos > 0 return packet end end |