Class: EventMachine::MQTT::ClientConnection

Inherits:
Connection
  • Object
show all
Includes:
Deferrable
Defined in:
lib/em/mqtt/client_connection.rb

Instance Attribute Summary collapse

Attributes inherited from Connection

#last_received, #last_sent, #state

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Connection

#connected?, #receive_data, #send_packet

Constructor Details

#initialize(args = {}) ⇒ ClientConnection

Initialize connection

Parameters:

  • args (Hash) (defaults to: {})

    Arguments for connection

Options Hash (args):

  • :client_id (String)

    A unique identifier for this client

  • :keep_alive (Integer)

    How often to send keep-alive pings (in seconds)

  • :clean_session (Boolean)

    Start a clean session with server or resume old one (default true)

  • :username (String)

    Username to authenticate with the server

  • :password (String)

    Password to authenticate with the server



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/em/mqtt/client_connection.rb', line 50

def initialize(args={})
  @client_id = MQTT::Client.generate_client_id
  @keep_alive = 10
  @clean_session = true
  @packet_id = 0
  @ack_timeout = 5
  @username = nil
  @password = nil
  @timer = nil

  if args.is_a?(Hash)
    args.each_pair do |k,v|
      instance_variable_set("@#{k}", v)
    end
  end
end

Instance Attribute Details

#ack_timeoutObject (readonly)

Returns the value of attribute ack_timeout.



11
12
13
# File 'lib/em/mqtt/client_connection.rb', line 11

def ack_timeout
  @ack_timeout
end

#clean_sessionObject (readonly)

Returns the value of attribute clean_session.



7
8
9
# File 'lib/em/mqtt/client_connection.rb', line 7

def clean_session
  @clean_session
end

#client_idObject (readonly)

Returns the value of attribute client_id.



5
6
7
# File 'lib/em/mqtt/client_connection.rb', line 5

def client_id
  @client_id
end

#keep_aliveObject (readonly)

Returns the value of attribute keep_alive.



6
7
8
# File 'lib/em/mqtt/client_connection.rb', line 6

def keep_alive
  @keep_alive
end

#packet_idObject (readonly)

Returns the value of attribute packet_id.



10
11
12
# File 'lib/em/mqtt/client_connection.rb', line 10

def packet_id
  @packet_id
end

#passwordObject (readonly)

Returns the value of attribute password.



9
10
11
# File 'lib/em/mqtt/client_connection.rb', line 9

def password
  @password
end

#timerObject (readonly)

Returns the value of attribute timer.



12
13
14
# File 'lib/em/mqtt/client_connection.rb', line 12

def timer
  @timer
end

#usernameObject (readonly)

Returns the value of attribute username.



8
9
10
# File 'lib/em/mqtt/client_connection.rb', line 8

def username
  @username
end

Class Method Details

.connect(*args, &blk) ⇒ Object

Connect to an MQTT server

Examples:

ClientConnection.connect('localhost', 1883)
ClientConnection.connect(:host => 'localhost', :username => 'user', :password => 'pass')


20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/em/mqtt/client_connection.rb', line 20

def self.connect(*args, &blk)
  hash = {
    :host => 'localhost',
    :port => MQTT::DEFAULT_PORT
  }

  i = 0
  args.each do |arg|
    if arg.is_a?(Hash)
      hash.merge!(arg)
    else
      if i == 0
        hash[:host] = arg
      elsif i == 1
        hash[:port] = arg
      end
      i += 1
    end
  end

  ::EventMachine.connect( hash.delete(:host), hash.delete(:port), self, hash, &blk )
end

Instance Method Details

#connection_completedObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/em/mqtt/client_connection.rb', line 72

def connection_completed
  # TCP socket established: send Connect packet
  packet = MQTT::Packet::Connect.new(
    :client_id => @client_id,
    :clean_session => @clean_session,
    :keep_alive => @keep_alive,
    :username => @username,
    :password => @password
  )

  send_packet(packet)

  @state = :connect_sent
end

#disconnect(send_msg = true) ⇒ Object

Disconnect from the MQTT broker. If you don’t want to say goodbye to the broker, set send_msg to false.



89
90
91
92
93
94
95
# File 'lib/em/mqtt/client_connection.rb', line 89

def disconnect(send_msg=true)
  # FIXME: only close if we aren't waiting for any acknowledgements
  if connected?
    send_packet(MQTT::Packet::Disconnect.new) if send_msg
  end
  @state = :disconnecting
end

#post_initObject



67
68
69
70
# File 'lib/em/mqtt/client_connection.rb', line 67

def post_init
  super
  @state = :connecting
end

#publish(topic, payload, retain = false, qos = 0) ⇒ Object

Publish a message on a particular topic to the MQTT broker.



116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/em/mqtt/client_connection.rb', line 116

def publish(topic, payload, retain=false, qos=0)
  # Defer publishing until we are connected
  callback do
    send_packet(
      MQTT::Packet::Publish.new(
        :id => next_packet_id,
        :qos => qos,
        :retain => retain,
        :topic => topic,
        :payload => payload
      )
    )
  end
end

#receive_callback(&block) ⇒ Object



97
98
99
# File 'lib/em/mqtt/client_connection.rb', line 97

def receive_callback(&block)
  @receive_callback = block
end

#receive_msg(packet) ⇒ Object



101
102
103
104
# File 'lib/em/mqtt/client_connection.rb', line 101

def receive_msg(packet)
  # Alternatively, subclass this method
  @receive_callback.call(packet) unless @receive_callback.nil?
end

#subscribe(*topics) ⇒ Object

Send a subscribe message for one or more topics on the MQTT broker.



132
133
134
135
136
137
138
139
140
141
142
# File 'lib/em/mqtt/client_connection.rb', line 132

def subscribe(*topics)
  # Defer subscribing until we are connected
  callback do
    send_packet(
      MQTT::Packet::Subscribe.new(
        :id => next_packet_id,
        :topics => topics
      )
    )
  end
end

#unbindObject



106
107
108
109
110
111
112
113
# File 'lib/em/mqtt/client_connection.rb', line 106

def unbind
  timer.cancel if timer
  unless state == :disconnecting
    # Re-throw any exceptions (if present) to avoid swallowed errors.
    raise $! || MQTT::NotConnectedException.new("Connection to server lost")
  end
  @state = :disconnected
end

#unsubscribe(*topics) ⇒ Object

Send a unsubscribe message for one or more topics on the MQTT broker



145
146
147
148
149
150
151
152
153
154
155
# File 'lib/em/mqtt/client_connection.rb', line 145

def unsubscribe(*topics)
  # Defer unsubscribing until we are connected
  callback do
    send_packet(
      MQTT::Packet::Unsubscribe.new(
        :id => next_packet_id,
        :topics => topics
      )
    )
  end
end