Class: EventMachine::MQTT::ClientConnection

Inherits:
Connection
  • Object
show all
Includes:
Deferrable
Defined in:
lib/em-mqtt/client_connection.rb

Instance Attribute Summary collapse

Attributes inherited from Connection

#last_received, #last_sent, #state

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Connection

#connected?, #receive_data, #send_packet

Instance Attribute Details

#ack_timeoutObject (readonly)

Returns the value of attribute ack_timeout.



9
10
11
# File 'lib/em-mqtt/client_connection.rb', line 9

def ack_timeout
  @ack_timeout
end

#clean_startObject (readonly)

Returns the value of attribute clean_start.



7
8
9
# File 'lib/em-mqtt/client_connection.rb', line 7

def clean_start
  @clean_start
end

#client_idObject (readonly)

Returns the value of attribute client_id.



5
6
7
# File 'lib/em-mqtt/client_connection.rb', line 5

def client_id
  @client_id
end

#keep_aliveObject (readonly)

Returns the value of attribute keep_alive.



6
7
8
# File 'lib/em-mqtt/client_connection.rb', line 6

def keep_alive
  @keep_alive
end

#message_idObject (readonly)

Returns the value of attribute message_id.



8
9
10
# File 'lib/em-mqtt/client_connection.rb', line 8

def message_id
  @message_id
end

#timerObject (readonly)

Returns the value of attribute timer.



10
11
12
# File 'lib/em-mqtt/client_connection.rb', line 10

def timer
  @timer
end

Class Method Details

.connect(host = MQTT::DEFAULT_HOST, port = MQTT::DEFAULT_PORT, *args, &blk) ⇒ Object

FIXME: change this to optionally take hash of options



13
14
15
# File 'lib/em-mqtt/client_connection.rb', line 13

def self.connect(host=MQTT::DEFAULT_HOST, port=MQTT::DEFAULT_PORT, *args, &blk)
  EventMachine.connect( host, port, self, *args, &blk )
end

Instance Method Details

#connection_completedObject



28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/em-mqtt/client_connection.rb', line 28

def connection_completed
  # Protocol name and version
  packet = MQTT::Packet::Connect.new(
    :clean_start => @clean_start,
    :keep_alive => @keep_alive,
    :client_id => @client_id
  )

  send_packet(packet)

  @state = :connect_sent
end

#disconnect(send_msg = true) ⇒ Object

Disconnect from the MQTT broker. If you don’t want to say goodbye to the broker, set send_msg to false.



43
44
45
46
47
48
49
# File 'lib/em-mqtt/client_connection.rb', line 43

def disconnect(send_msg=true)
  # FIXME: only close if we aren't waiting for any acknowledgements
  if connected?
    send_packet(MQTT::Packet::Disconnect.new) if send_msg
  end
  @state = :disconnecting
end

#post_initObject



17
18
19
20
21
22
23
24
25
26
# File 'lib/em-mqtt/client_connection.rb', line 17

def post_init
  super
  @state = :connecting
  @client_id = MQTT::Client.generate_client_id
  @keep_alive = 10
  @clean_start = true
  @message_id = 0
  @ack_timeout = 5
  @timer = nil
end

#publish(topic, payload, retain = false, qos = 0) ⇒ Object

Publish a message on a particular topic to the MQTT broker.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/em-mqtt/client_connection.rb', line 64

def publish(topic, payload, retain=false, qos=0)
  # Defer publishing until we are connected
  callback do
    send_packet(
      MQTT::Packet::Publish.new(
        :qos => qos,
        :retain => retain,
        :topic => topic,
        :payload => payload,
        :message_id => @message_id.next
      )
    )
  end
end

#receive_msg(message) ⇒ Object



51
52
53
# File 'lib/em-mqtt/client_connection.rb', line 51

def receive_msg(message)
  # Subclass this method
end

#subscribe(*topics) ⇒ Object

Send a subscribe message for one or more topics on the MQTT broker.



80
81
82
83
84
85
86
87
88
89
90
# File 'lib/em-mqtt/client_connection.rb', line 80

def subscribe(*topics)
  # Defer subscribing until we are connected
  callback do
    send_packet(
      MQTT::Packet::Subscribe.new(
        :topics => topics,
        :message_id => @message_id.next
      )
    )
  end
end

#unbindObject



55
56
57
58
59
60
61
# File 'lib/em-mqtt/client_connection.rb', line 55

def unbind
  timer.cancel if timer
  unless state == :disconnecting
    raise MQTT::NotConnectedException.new("Connection to server lost")
  end
  @state = :disconnected
end

#unsubscribe(*topics) ⇒ Object

Send a unsubscribe message for one or more topics on the MQTT broker



93
94
95
96
97
98
99
100
101
102
103
# File 'lib/em-mqtt/client_connection.rb', line 93

def unsubscribe(*topics)
  # Defer unsubscribing until we are connected
  callback do
    send_packet(
      MQTT::Packet::Unsubscribe.new(
        :topics => topics,
        :message_id => @message_id.next
      )
    )
  end
end