Class: MQTT::Client
- Inherits:
-
Object
- Object
- MQTT::Client
- Defined in:
- lib/mqtt/client.rb
Constant Summary collapse
- SELECT_TIMEOUT =
Timeout between select polls (in seconds)
0.5
- ATTR_DEFAULTS =
Default attribute values
{ :host => nil, :port => nil, :version => '3.1.1', :keep_alive => 15, :clean_session => true, :client_id => nil, :ack_timeout => 5, :username => nil, :password => nil, :will_topic => nil, :will_payload => nil, :will_qos => 0, :will_retain => false, :ssl => false }
Instance Attribute Summary collapse
-
#ack_timeout ⇒ Object
Number of seconds to wait for acknowledgement packets (default is 5 seconds).
-
#clean_session ⇒ Object
Set the ‘Clean Session’ flag when connecting? (default is true).
-
#client_id ⇒ Object
Client Identifier.
-
#host ⇒ Object
Hostname of the remote server.
-
#keep_alive ⇒ Object
Time (in seconds) between pings to remote server (default is 15 seconds).
-
#last_ping_response ⇒ Object
readonly
Last ping response time.
-
#password ⇒ Object
Password to authenticate to the server with.
-
#port ⇒ Object
Port number of the remote server.
-
#ssl ⇒ Object
Set to true to enable SSL/TLS encrypted communication.
-
#username ⇒ Object
Username to authenticate to the server with.
-
#version ⇒ Object
The version number of the MQTT protocol to use (default 3.1.1).
-
#will_payload ⇒ Object
Contents of message that is sent by server when client disconnect.
-
#will_qos ⇒ Object
The QoS level of the will message sent by the server.
-
#will_retain ⇒ Object
If the Will message should be retain by the server after it is sent.
-
#will_topic ⇒ Object
The topic that the Will message is published to.
Class Method Summary collapse
-
.connect(*args, &block) ⇒ Object
Create and connect a new MQTT Client.
-
.generate_client_id(prefix = 'ruby', length = 16) ⇒ Object
Generate a random client identifier (using the characters 0-9 and a-z).
Instance Method Summary collapse
-
#ca_file=(path) ⇒ Object
Set a path to a file containing a PEM-format CA certificate and enable peer verification.
-
#cert=(cert) ⇒ Object
PEM-format client certificate.
-
#cert_file=(path) ⇒ Object
Set a path to a file containing a PEM-format client certificate.
-
#clear_queue ⇒ Object
Clear the incoming message queue.
-
#connect(clientid = nil) ⇒ Object
Connect to the MQTT server If a block is given, then yield to that block and then disconnect again.
-
#connected? ⇒ Boolean
Checks whether the client is connected to the server.
-
#disconnect(send_msg = true) ⇒ Object
Disconnect from the MQTT server.
-
#get(topic = nil, options = {}) ⇒ Object
Return the next message received from the MQTT server.
-
#get_packet(topic = nil) ⇒ Object
Return the next packet object received from the MQTT server.
-
#initialize(*args) ⇒ Client
constructor
Create a new MQTT Client instance.
-
#key=(*args) ⇒ Object
Set to a PEM-format client private key.
-
#key_file=(*args) ⇒ Object
Set a path to a file containing a PEM-format client private key.
-
#publish(topic, payload = '', retain = false, qos = 0) ⇒ Object
Publish a message on a particular topic to the MQTT server.
-
#queue_empty? ⇒ Boolean
Returns true if the incoming message queue is empty.
-
#queue_length ⇒ Object
Returns the length of the incoming message queue.
-
#remote_host ⇒ Object
deprecated
Deprecated.
Please use #host instead
-
#remote_host=(args) ⇒ Object
deprecated
Deprecated.
Please use #host= instead
-
#remote_port ⇒ Object
deprecated
Deprecated.
Please use #port instead
-
#remote_port=(args) ⇒ Object
deprecated
Deprecated.
Please use #port= instead
-
#set_will(topic, payload, retain = false, qos = 0) ⇒ Object
Set the Will for the client.
-
#ssl_context ⇒ Object
Get the OpenSSL context, that is used if SSL/TLS is enabled.
-
#subscribe(*topics) ⇒ Object
Send a subscribe message for one or more topics on the MQTT server.
-
#unsubscribe(*topics) ⇒ Object
Send a unsubscribe message for one or more topics on the MQTT server.
Constructor Details
#initialize(*args) ⇒ Client
Create a new MQTT Client instance
Accepts one of the following:
-
a URI that uses the MQTT scheme
-
a hostname and port
-
a Hash containing attributes to be set on the new instance
If no arguments are given then the method will look for a URI in the MQTT_SERVER environment variable.
Examples:
client = MQTT::Client.new
client = MQTT::Client.new('mqtt://myserver.example.com')
client = MQTT::Client.new('mqtt://user:[email protected]')
client = MQTT::Client.new('myserver.example.com')
client = MQTT::Client.new('myserver.example.com', 18830)
client = MQTT::Client.new(:host => 'myserver.example.com')
client = MQTT::Client.new(:host => 'myserver.example.com', :keep_alive => 30)
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/mqtt/client.rb', line 129 def initialize(*args) attributes = args.last.is_a?(Hash) ? args.pop : {} # Set server URI from environment if present attributes.merge!(parse_uri(ENV['MQTT_SERVER'])) if args.length.zero? && ENV['MQTT_SERVER'] if args.length >= 1 case args[0] when URI attributes.merge!(parse_uri(args[0])) when %r{^mqtts?://} attributes.merge!(parse_uri(args[0])) else attributes[:host] = args[0] end end if args.length >= 2 attributes[:port] = args[1] unless args[1].nil? end raise ArgumentError, 'Unsupported number of arguments' if args.length >= 3 # Merge arguments with default values for attributes ATTR_DEFAULTS.merge(attributes).each_pair do |k, v| send("#{k}=", v) end # Set a default port number if @port.nil? @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT end if @ssl require 'openssl' require 'mqtt/openssl_fix' end # Initialise private instance variables @last_ping_request = current_time @last_ping_response = current_time @socket = nil @read_queue = Queue.new @pubacks = {} @read_thread = nil @write_semaphore = Mutex.new @pubacks_semaphore = Mutex.new end |
Instance Attribute Details
#ack_timeout ⇒ Object
Number of seconds to wait for acknowledgement packets (default is 5 seconds)
37 38 39 |
# File 'lib/mqtt/client.rb', line 37 def ack_timeout @ack_timeout end |
#clean_session ⇒ Object
Set the ‘Clean Session’ flag when connecting? (default is true)
31 32 33 |
# File 'lib/mqtt/client.rb', line 31 def clean_session @clean_session end |
#client_id ⇒ Object
Client Identifier
34 35 36 |
# File 'lib/mqtt/client.rb', line 34 def client_id @client_id end |
#host ⇒ Object
Hostname of the remote server
9 10 11 |
# File 'lib/mqtt/client.rb', line 9 def host @host end |
#keep_alive ⇒ Object
Time (in seconds) between pings to remote server (default is 15 seconds)
28 29 30 |
# File 'lib/mqtt/client.rb', line 28 def keep_alive @keep_alive end |
#last_ping_response ⇒ Object (readonly)
Last ping response time
58 59 60 |
# File 'lib/mqtt/client.rb', line 58 def last_ping_response @last_ping_response end |
#password ⇒ Object
Password to authenticate to the server with
43 44 45 |
# File 'lib/mqtt/client.rb', line 43 def password @password end |
#port ⇒ Object
Port number of the remote server
12 13 14 |
# File 'lib/mqtt/client.rb', line 12 def port @port end |
#ssl ⇒ Object
Set to true to enable SSL/TLS encrypted communication
Set to a symbol to use a specific variant of SSL/TLS. Allowed values include:
25 26 27 |
# File 'lib/mqtt/client.rb', line 25 def ssl @ssl end |
#username ⇒ Object
Username to authenticate to the server with
40 41 42 |
# File 'lib/mqtt/client.rb', line 40 def username @username end |
#version ⇒ Object
The version number of the MQTT protocol to use (default 3.1.1)
15 16 17 |
# File 'lib/mqtt/client.rb', line 15 def version @version end |
#will_payload ⇒ Object
Contents of message that is sent by server when client disconnect
49 50 51 |
# File 'lib/mqtt/client.rb', line 49 def will_payload @will_payload end |
#will_qos ⇒ Object
The QoS level of the will message sent by the server
52 53 54 |
# File 'lib/mqtt/client.rb', line 52 def will_qos @will_qos end |
#will_retain ⇒ Object
If the Will message should be retain by the server after it is sent
55 56 57 |
# File 'lib/mqtt/client.rb', line 55 def will_retain @will_retain end |
#will_topic ⇒ Object
The topic that the Will message is published to
46 47 48 |
# File 'lib/mqtt/client.rb', line 46 def will_topic @will_topic end |
Class Method Details
.connect(*args, &block) ⇒ Object
91 92 93 94 95 |
# File 'lib/mqtt/client.rb', line 91 def self.connect(*args, &block) client = MQTT::Client.new(*args) client.connect(&block) client end |
.generate_client_id(prefix = 'ruby', length = 16) ⇒ Object
Generate a random client identifier (using the characters 0-9 and a-z)
99 100 101 102 103 104 105 106 107 108 |
# File 'lib/mqtt/client.rb', line 99 def self.generate_client_id(prefix = 'ruby', length = 16) str = prefix.dup length.times do num = rand(36) # Adjust based on number or letter. num += num < 10 ? 48 : 87 str += num.chr end str end |
Instance Method Details
#ca_file=(path) ⇒ Object
Set a path to a file containing a PEM-format CA certificate and enable peer verification
206 207 208 209 |
# File 'lib/mqtt/client.rb', line 206 def ca_file=(path) ssl_context.ca_file = path ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless path.nil? end |
#cert=(cert) ⇒ Object
PEM-format client certificate
189 190 191 |
# File 'lib/mqtt/client.rb', line 189 def cert=(cert) ssl_context.cert = OpenSSL::X509::Certificate.new(cert) end |
#cert_file=(path) ⇒ Object
Set a path to a file containing a PEM-format client certificate
184 185 186 |
# File 'lib/mqtt/client.rb', line 184 def cert_file=(path) self.cert = File.read(path) end |
#clear_queue ⇒ Object
Clear the incoming message queue.
446 447 448 |
# File 'lib/mqtt/client.rb', line 446 def clear_queue @read_queue.clear end |
#connect(clientid = nil) ⇒ Object
Connect to the MQTT server If a block is given, then yield to that block and then disconnect again.
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/mqtt/client.rb', line 224 def connect(clientid = nil) @client_id = clientid unless clientid.nil? if @client_id.nil? || @client_id.empty? raise 'Must provide a client_id if clean_session is set to false' unless @clean_session # Empty client id is not allowed for version 3.1.0 @client_id = MQTT::Client.generate_client_id if @version == '3.1.0' end raise 'No MQTT server host set when attempting to connect' if @host.nil? unless connected? # Create network socket tcp_socket = TCPSocket.new(@host, @port) if @ssl # Set the protocol version ssl_context.ssl_version = @ssl if @ssl.is_a?(Symbol) @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) @socket.sync_close = true # Set hostname on secure socket for Server Name Indication (SNI) @socket.hostname = @host if @socket.respond_to?(:hostname=) @socket.connect else @socket = tcp_socket end # Construct a connect packet packet = MQTT::Packet::Connect.new( :version => @version, :clean_session => @clean_session, :keep_alive => @keep_alive, :client_id => @client_id, :username => @username, :password => @password, :will_topic => @will_topic, :will_payload => @will_payload, :will_qos => @will_qos, :will_retain => @will_retain ) # Send packet send_packet(packet) # Receive response receive_connack # Start packet reading thread @read_thread = Thread.new(Thread.current) do |parent| Thread.current[:parent] = parent receive_packet while connected? end end return unless block_given? # If a block is given, then yield and disconnect begin yield(self) ensure disconnect end end |
#connected? ⇒ Boolean
Checks whether the client is connected to the server.
312 313 314 |
# File 'lib/mqtt/client.rb', line 312 def connected? !@socket.nil? && !@socket.closed? end |
#disconnect(send_msg = true) ⇒ Object
Disconnect from the MQTT server. If you don’t want to say goodbye to the server, set send_msg to false.
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/mqtt/client.rb', line 294 def disconnect(send_msg = true) # Stop reading packets from the socket first @read_thread.kill if @read_thread && @read_thread.alive? @read_thread = nil return unless connected? # Close the socket if it is open if send_msg packet = MQTT::Packet::Disconnect.new send_packet(packet) end @socket.close unless @socket.nil? handle_close @socket = nil end |
#get(topic = nil, options = {}) ⇒ Object
Return the next message received from the MQTT server. An optional topic can be given to subscribe to.
The method either returns the topic and message as an array:
topic, = client.get
Or can be used with a block to keep processing messages:
client.get('test') do |topic,payload|
# Do stuff here
end
389 390 391 392 393 394 395 396 397 398 399 400 401 |
# File 'lib/mqtt/client.rb', line 389 def get(topic = nil, = {}) if block_given? get_packet(topic) do |packet| yield(packet.topic, packet.payload) unless packet.retain && [:omit_retained] end else loop do # Wait for one packet to be available packet = get_packet(topic) return packet.topic, packet.payload unless packet.retain && [:omit_retained] end end end |
#get_packet(topic = nil) ⇒ Object
Return the next packet object received from the MQTT server. An optional topic can be given to subscribe to.
The method either returns a single packet:
packet = client.get_packet
puts packet.topic
Or can be used with a block to keep processing messages:
client.get_packet('test') do |packet|
# Do stuff here
puts packet.topic
end
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 |
# File 'lib/mqtt/client.rb', line 416 def get_packet(topic = nil) # Subscribe to a topic, if an argument is given subscribe(topic) unless topic.nil? if block_given? # Loop forever! loop do packet = @read_queue.pop yield(packet) puback_packet(packet) if packet.qos > 0 end else # Wait for one packet to be available packet = @read_queue.pop puback_packet(packet) if packet.qos > 0 return packet end end |
#key=(*args) ⇒ Object
Set to a PEM-format client private key
200 201 202 203 |
# File 'lib/mqtt/client.rb', line 200 def key=(*args) cert, passphrase = args.flatten ssl_context.key = OpenSSL::PKey::RSA.new(cert, passphrase) end |
#key_file=(*args) ⇒ Object
Set a path to a file containing a PEM-format client private key
194 195 196 197 |
# File 'lib/mqtt/client.rb', line 194 def key_file=(*args) path, passphrase = args.flatten ssl_context.key = OpenSSL::PKey::RSA.new(File.open(path), passphrase) end |
#publish(topic, payload = '', retain = false, qos = 0) ⇒ Object
Publish a message on a particular topic to the MQTT server.
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
# File 'lib/mqtt/client.rb', line 317 def publish(topic, payload = '', retain = false, qos = 0) raise ArgumentError, 'Topic name cannot be nil' if topic.nil? raise ArgumentError, 'Topic name cannot be empty' if topic.empty? packet = MQTT::Packet::Publish.new( :id => next_packet_id, :qos => qos, :retain => retain, :topic => topic, :payload => payload ) # Send the packet res = send_packet(packet) return if qos.zero? queue = Queue.new wait_for_puback packet.id, queue deadline = current_time + @ack_timeout loop do response = queue.pop case response when :read_timeout return -1 if current_time > deadline when :close return -1 else @pubacks_semaphore.synchronize do @pubacks.delete packet.id end break end end res end |
#queue_empty? ⇒ Boolean
Returns true if the incoming message queue is empty.
436 437 438 |
# File 'lib/mqtt/client.rb', line 436 def queue_empty? @read_queue.empty? end |
#queue_length ⇒ Object
Returns the length of the incoming message queue.
441 442 443 |
# File 'lib/mqtt/client.rb', line 441 def queue_length @read_queue.length end |
#remote_host ⇒ Object
Please use #host instead
604 605 606 |
# File 'lib/mqtt/client.rb', line 604 def remote_host host end |
#remote_host=(args) ⇒ Object
Please use #host= instead
609 610 611 |
# File 'lib/mqtt/client.rb', line 609 def remote_host=(args) self.host = args end |
#remote_port ⇒ Object
Please use #port instead
614 615 616 |
# File 'lib/mqtt/client.rb', line 614 def remote_port port end |
#remote_port=(args) ⇒ Object
Please use #port= instead
619 620 621 |
# File 'lib/mqtt/client.rb', line 619 def remote_port=(args) self.port = args end |
#set_will(topic, payload, retain = false, qos = 0) ⇒ Object
Set the Will for the client
The will is a message that will be delivered by the server when the client dies. The Will must be set before establishing a connection to the server
215 216 217 218 219 220 |
# File 'lib/mqtt/client.rb', line 215 def set_will(topic, payload, retain = false, qos = 0) self.will_topic = topic self.will_payload = payload self.will_retain = retain self.will_qos = qos end |
#ssl_context ⇒ Object
Get the OpenSSL context, that is used if SSL/TLS is enabled
179 180 181 |
# File 'lib/mqtt/client.rb', line 179 def ssl_context @ssl_context ||= OpenSSL::SSL::SSLContext.new end |
#subscribe(*topics) ⇒ Object
Send a subscribe message for one or more topics on the MQTT server. The topics parameter should be one of the following:
-
String: subscribe to one topic with QoS 0
-
Array: subscribe to multiple topics with QoS 0
-
Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level
For example:
client.subscribe( 'a/b' )
client.subscribe( 'a/b', 'c/d' )
client.subscribe( ['a/b',0], ['c/d',1] )
client.subscribe( 'a/b' => 0, 'c/d' => 1 )
370 371 372 373 374 375 376 |
# File 'lib/mqtt/client.rb', line 370 def subscribe(*topics) packet = MQTT::Packet::Subscribe.new( :id => next_packet_id, :topics => topics ) send_packet(packet) end |
#unsubscribe(*topics) ⇒ Object
Send a unsubscribe message for one or more topics on the MQTT server
451 452 453 454 455 456 457 458 459 |
# File 'lib/mqtt/client.rb', line 451 def unsubscribe(*topics) topics = topics.first if topics.is_a?(Enumerable) && topics.count == 1 packet = MQTT::Packet::Unsubscribe.new( :topics => topics, :id => next_packet_id ) send_packet(packet) end |