Class: EventMachine::MQTTSN::GatewayHandler
- Inherits:
-
Connection
- Object
- Connection
- EventMachine::MQTTSN::GatewayHandler
- Defined in:
- lib/em/mqtt-sn/gateway_handler.rb
Overview
There is only a single instance of GatewayHandler which processes UDP packets from all MQTT-SN clients.
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Returns the value of attribute connections.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#server_address ⇒ Object
readonly
Returns the value of attribute server_address.
-
#server_port ⇒ Object
readonly
Returns the value of attribute server_port.
Instance Method Summary collapse
-
#cleanup ⇒ Object
Periodic task to cleanup dead connections.
-
#connect(peername, packet) ⇒ Object
CONNECT received from client - establish connection to server.
-
#disconnect(connection) ⇒ Object
Disconnect client from server.
-
#initialize(attr) ⇒ GatewayHandler
constructor
A new instance of GatewayHandler.
-
#process_packet(peername, packet) ⇒ Object
Incoming packet received from client.
-
#publish(connection, packet) ⇒ Object
PUBLISH received from client - pass it on to the server.
-
#receive_data(data) ⇒ Object
UDP packet received by gateway.
-
#register(connection, packet) ⇒ Object
REGISTER received from client.
-
#relay_from_server(connection, packet) ⇒ Object
Handle a MQTT packet coming back from the server.
-
#subscribe(connection, packet) ⇒ Object
SUBSCRIBE received from client - pass it on to the server.
Constructor Details
#initialize(attr) ⇒ GatewayHandler
Returns a new instance of GatewayHandler.
12 13 14 15 16 17 18 19 20 |
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 12 def initialize(attr) @connections = {} attr.each_pair do |k,v| instance_variable_set("@#{k}", v) end # Run the cleanup task periodically EventMachine.add_periodic_timer(10) { cleanup } end |
Instance Attribute Details
#connections ⇒ Object (readonly)
Returns the value of attribute connections.
8 9 10 |
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 8 def connections @connections end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
7 8 9 |
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 7 def logger @logger end |
#server_address ⇒ Object (readonly)
Returns the value of attribute server_address.
9 10 11 |
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 9 def server_address @server_address end |
#server_port ⇒ Object (readonly)
Returns the value of attribute server_port.
10 11 12 |
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 10 def server_port @server_port end |
Instance Method Details
#cleanup ⇒ Object
Periodic task to cleanup dead connections
208 209 210 211 212 213 214 215 |
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 208 def cleanup connections.each_pair do |key,connection| unless connection.connected? logger.debug("Destroying connection: #{connection.client_id}") @connections.delete(key) end end end |
#connect(peername, packet) ⇒ Object
CONNECT received from client - establish connection to server
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 62 def connect(peername, packet) # If connection already exists, disconnect first if @connections.has_key?(peername) logger.warn("Received CONNECT while already connected") @connections[peername].disconnect end # Create a TCP connection to the server client_port, client_address = Socket.unpack_sockaddr_in(peername) connection = EventMachine::connect( server_address, server_port, EventMachine::MQTTSN::ServerConnection, self, client_address, client_port ) # Store the client ID connection.client_id = packet.client_id # Send a MQTT connect packet to the server connection.send_packet MQTT::Packet::Connect.new( :client_id => packet.client_id, :keep_alive => packet.keep_alive, :clean_session => packet.clean_session ) # Add the connection to the table @connections[peername] = connection end |
#disconnect(connection) ⇒ Object
Disconnect client from server
198 199 200 201 202 203 204 205 |
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 198 def disconnect(connection) if connection.connected? logger.info("Disconnected: #{connection.client_id}") mqttsn_packet = EventMachine::MQTTSN::Packet::Disconnect.new send_datagram(mqttsn_packet.to_s, connection.client_address, connection.client_port) connection.disconnect end end |
#process_packet(peername, packet) ⇒ Object
Incoming packet received from client
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 31 def process_packet(peername, packet) logger.debug("Received MQTT-SN: #{packet.class}") if packet.class == EventMachine::MQTTSN::Packet::Connect connect(peername, packet) else connection = @connections[peername] unless connection.nil? or !connection.connected? case packet when EventMachine::MQTTSN::Packet::Register register(connection, packet) when EventMachine::MQTTSN::Packet::Publish publish(connection, packet) when EventMachine::MQTTSN::Packet::Subscribe subscribe(connection, packet) when EventMachine::MQTTSN::Packet::Pingreq connection.send_packet MQTT::Packet::Pingreq.new when EventMachine::MQTTSN::Packet::Pingresp connection.send_packet MQTT::Packet::Pingresp.new when EventMachine::MQTTSN::Packet::Disconnect disconnect(connection) else logger.warn("Unable to handle MQTT-SN packet of type: #{packet.class}") end else logger.warn("Received MQTT-SN packet of type: #{packet.class} while not connected") end end end |
#publish(connection, packet) ⇒ Object
PUBLISH received from client - pass it on to the server
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 165 def publish(connection, packet) if packet.topic_id_type == :short topic_name = packet.topic_id elsif packet.topic_id_type == :normal topic_name = connection.get_topic_name(packet.topic_id) end if topic_name logger.info("#{connection.client_id} publishing to '#{topic_name}'") connection.send_packet MQTT::Packet::Publish.new( :topic => topic_name, :payload => packet.data, :retain => packet.retain, :qos => packet.qos ) else # FIXME: disconnect? logger.warn("Invalid topic ID: #{packet.topic_id}") end end |
#receive_data(data) ⇒ Object
UDP packet received by gateway
23 24 25 26 27 28 |
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 23 def receive_data(data) packet = EventMachine::MQTTSN::Packet.parse(data) unless packet.nil? process_packet(get_peername, packet) end end |
#register(connection, packet) ⇒ Object
REGISTER received from client
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 148 def register(connection, packet) regack = EventMachine::MQTTSN::Packet::Regack.new( :topic_id_type => :normal, :id => packet.id ) topic_id_type, topic_id = connection.get_topic_id(packet.topic_name) unless topic_id.nil? regack.return_code = 0x00 # Accepted regack.topic_id = topic_id else regack.return_code = 0x02 # Rejected: invalid topic ID end send_data(regack.to_s) end |
#relay_from_server(connection, packet) ⇒ Object
Handle a MQTT packet coming back from the server
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 92 def relay_from_server(connection, packet) logger.debug("Received MQTT: #{packet.inspect}") case packet when MQTT::Packet::Connack # FIXME: re-map the return code mqttsn_packet = EventMachine::MQTTSN::Packet::Connack.new( :return_code => packet.return_code ) if packet.return_code == 0 logger.info("#{connection.client_id} is now connected") else logger.info("#{connection.client_id} failed to connect: #{packet.return_msg}") end when MQTT::Packet::Suback # Check that it is a response to a request we made request = connection.remove_from_pending(packet.id) if request logger.debug("#{connection.client_id} now subscribed to '#{request.topic_name}'") topic_id_type, topic_id = connection.get_topic_id(request.topic_name) mqttsn_packet = EventMachine::MQTTSN::Packet::Suback.new( :topic_id_type => topic_id_type, :topic_id => topic_id, :qos => packet.granted_qos.first, :id => packet.id, :return_code => 0x00 ) else logger.warn("Received Suback from server for something we didn't request: #{packet.inspect}") end when MQTT::Packet::Publish logger.info("#{connection.client_id} recieved publish to '#{packet.topic}'") # FIXME: send register if this is a new topic topic_id_type, topic_id = connection.get_topic_id(packet.topic) mqttsn_packet = EventMachine::MQTTSN::Packet::Publish.new( :duplicate => packet.duplicate, :qos => packet.qos, :retain => packet.retain, :topic_id_type => topic_id_type, :topic_id => topic_id, :id => packet.id, :data => packet.payload ) when MQTT::Packet::Pingreq mqttsn_packet = EventMachine::MQTTSN::Packet::Pingreq.new when MQTT::Packet::Pingresp mqttsn_packet = EventMachine::MQTTSN::Packet::Pingresp.new else logger.warn("Unable to handle MQTT packet of type: #{packet.class}") end unless mqttsn_packet.nil? send_datagram(mqttsn_packet.to_s, connection.client_address, connection.client_port) end end |
#subscribe(connection, packet) ⇒ Object
SUBSCRIBE received from client - pass it on to the server
187 188 189 190 191 192 193 194 195 |
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 187 def subscribe(connection, packet) logger.info("#{connection.client_id} subscribing to '#{packet.topic_name}'") mqtt_packet = MQTT::Packet::Subscribe.new( :id => packet.id, :topics => packet.topic_name ) connection.add_to_pending(packet) connection.send_packet(mqtt_packet) end |