Class: EventMachine::MQTTSN::GatewayHandler

Inherits:
Connection
  • Object
show all
Defined in:
lib/em/mqtt-sn/gateway_handler.rb

Overview

There is only a single instance of GatewayHandler which processes UDP packets from all MQTT-SN clients.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(attr) ⇒ GatewayHandler

Returns a new instance of GatewayHandler.



12
13
14
15
16
17
18
19
20
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 12

def initialize(attr)
  @connections = {}
  attr.each_pair do |k,v|
    instance_variable_set("@#{k}", v)
  end

  # Run the cleanup task periodically
  EventMachine.add_periodic_timer(10) { cleanup }
end

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



8
9
10
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 8

def connections
  @connections
end

#loggerObject (readonly)

Returns the value of attribute logger.



7
8
9
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 7

def logger
  @logger
end

#server_addressObject (readonly)

Returns the value of attribute server_address.



9
10
11
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 9

def server_address
  @server_address
end

#server_portObject (readonly)

Returns the value of attribute server_port.



10
11
12
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 10

def server_port
  @server_port
end

Instance Method Details

#cleanupObject

Periodic task to cleanup dead connections



208
209
210
211
212
213
214
215
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 208

def cleanup
  connections.each_pair do |key,connection|
    unless connection.connected?
      logger.debug("Destroying connection: #{connection.client_id}")
      @connections.delete(key)
    end
  end
end

#connect(peername, packet) ⇒ Object

CONNECT received from client - establish connection to server



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 62

def connect(peername, packet)
  # If connection already exists, disconnect first
  if @connections.has_key?(peername)
    logger.warn("Received CONNECT while already connected")
    @connections[peername].disconnect
  end

  # Create a TCP connection to the server
  client_port, client_address = Socket.unpack_sockaddr_in(peername)
  connection = EventMachine::connect(
    server_address, server_port,
    EventMachine::MQTTSN::ServerConnection,
    self, client_address, client_port
  )

  # Store the client ID
  connection.client_id = packet.client_id

  # Send a MQTT connect packet to the server
  connection.send_packet MQTT::Packet::Connect.new(
    :client_id => packet.client_id,
    :keep_alive => packet.keep_alive,
    :clean_session => packet.clean_session
  )

  # Add the connection to the table
  @connections[peername] = connection
end

#disconnect(connection) ⇒ Object

Disconnect client from server



198
199
200
201
202
203
204
205
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 198

def disconnect(connection)
  if connection.connected?
    logger.info("Disconnected: #{connection.client_id}")
    mqttsn_packet = EventMachine::MQTTSN::Packet::Disconnect.new
    send_datagram(mqttsn_packet.to_s, connection.client_address, connection.client_port)
    connection.disconnect
  end
end

#process_packet(peername, packet) ⇒ Object

Incoming packet received from client



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 31

def process_packet(peername, packet)
  logger.debug("Received MQTT-SN: #{packet.class}")

  if packet.class == EventMachine::MQTTSN::Packet::Connect
    connect(peername, packet)
  else
    connection = @connections[peername]
    unless connection.nil? or !connection.connected?
      case packet
        when EventMachine::MQTTSN::Packet::Register
          register(connection, packet)
        when EventMachine::MQTTSN::Packet::Publish
          publish(connection, packet)
        when EventMachine::MQTTSN::Packet::Subscribe
          subscribe(connection, packet)
        when EventMachine::MQTTSN::Packet::Pingreq
          connection.send_packet MQTT::Packet::Pingreq.new
        when EventMachine::MQTTSN::Packet::Pingresp
          connection.send_packet MQTT::Packet::Pingresp.new
        when EventMachine::MQTTSN::Packet::Disconnect
          disconnect(connection)
        else
          logger.warn("Unable to handle MQTT-SN packet of type: #{packet.class}")
      end
    else
      logger.warn("Received MQTT-SN packet of type: #{packet.class} while not connected")
    end
  end
end

#publish(connection, packet) ⇒ Object

PUBLISH received from client - pass it on to the server



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 165

def publish(connection, packet)
  if packet.topic_id_type == :short
    topic_name = packet.topic_id
  elsif packet.topic_id_type == :normal
    topic_name = connection.get_topic_name(packet.topic_id)
  end

  if topic_name
    logger.info("#{connection.client_id} publishing to '#{topic_name}'")
    connection.send_packet MQTT::Packet::Publish.new(
      :topic => topic_name,
      :payload => packet.data,
      :retain => packet.retain,
      :qos => packet.qos
    )
  else
    # FIXME: disconnect?
    logger.warn("Invalid topic ID: #{packet.topic_id}")
  end
end

#receive_data(data) ⇒ Object

UDP packet received by gateway



23
24
25
26
27
28
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 23

def receive_data(data)
  packet = EventMachine::MQTTSN::Packet.parse(data)
  unless packet.nil?
    process_packet(get_peername, packet)
  end
end

#register(connection, packet) ⇒ Object

REGISTER received from client



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 148

def register(connection, packet)
  regack = EventMachine::MQTTSN::Packet::Regack.new(
    :topic_id_type => :normal,
    :id => packet.id
  )

  topic_id_type, topic_id = connection.get_topic_id(packet.topic_name)
  unless topic_id.nil?
    regack.return_code = 0x00   # Accepted
    regack.topic_id = topic_id
  else
     regack.return_code = 0x02  # Rejected: invalid topic ID
  end
  send_data(regack.to_s)
end

#relay_from_server(connection, packet) ⇒ Object

Handle a MQTT packet coming back from the server



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 92

def relay_from_server(connection, packet)
  logger.debug("Received MQTT: #{packet.inspect}")
  case packet
    when MQTT::Packet::Connack
      # FIXME: re-map the return code
      mqttsn_packet = EventMachine::MQTTSN::Packet::Connack.new(
        :return_code => packet.return_code
      )
      if packet.return_code == 0
        logger.info("#{connection.client_id} is now connected")
      else
        logger.info("#{connection.client_id} failed to connect: #{packet.return_msg}")
      end
    when MQTT::Packet::Suback
      # Check that it is a response to a request we made
      request = connection.remove_from_pending(packet.id)
      if request
        logger.debug("#{connection.client_id} now subscribed to '#{request.topic_name}'")
        topic_id_type, topic_id = connection.get_topic_id(request.topic_name)
        mqttsn_packet = EventMachine::MQTTSN::Packet::Suback.new(
          :topic_id_type => topic_id_type,
          :topic_id => topic_id,
          :qos => packet.granted_qos.first,
          :id => packet.id,
          :return_code => 0x00
        )
      else
        logger.warn("Received Suback from server for something we didn't request: #{packet.inspect}")
      end
    when MQTT::Packet::Publish
      logger.info("#{connection.client_id} recieved publish to '#{packet.topic}'")
      # FIXME: send register if this is a new topic
      topic_id_type, topic_id = connection.get_topic_id(packet.topic)
      mqttsn_packet = EventMachine::MQTTSN::Packet::Publish.new(
        :duplicate => packet.duplicate,
        :qos => packet.qos,
        :retain => packet.retain,
        :topic_id_type => topic_id_type,
        :topic_id => topic_id,
        :id => packet.id,
        :data => packet.payload
      )
    when MQTT::Packet::Pingreq
      mqttsn_packet = EventMachine::MQTTSN::Packet::Pingreq.new
    when MQTT::Packet::Pingresp
      mqttsn_packet = EventMachine::MQTTSN::Packet::Pingresp.new
    else
      logger.warn("Unable to handle MQTT packet of type: #{packet.class}")
  end

  unless mqttsn_packet.nil?
    send_datagram(mqttsn_packet.to_s, connection.client_address, connection.client_port)
  end
end

#subscribe(connection, packet) ⇒ Object

SUBSCRIBE received from client - pass it on to the server



187
188
189
190
191
192
193
194
195
# File 'lib/em/mqtt-sn/gateway_handler.rb', line 187

def subscribe(connection, packet)
  logger.info("#{connection.client_id} subscribing to '#{packet.topic_name}'")
  mqtt_packet = MQTT::Packet::Subscribe.new(
    :id => packet.id,
    :topics => packet.topic_name
  )
  connection.add_to_pending(packet)
  connection.send_packet(mqtt_packet)
end