Class: MQTT::Client
- Inherits:
-
Object
- Object
- MQTT::Client
- Defined in:
- lib/mqtt/client.rb
Constant Summary collapse
- ATTR_DEFAULTS =
Default attribute values
{ host: nil, port: nil, version: "3.1.1", keep_alive: 15, clean_session: true, client_id: nil, ack_timeout: 5, connect_timeout: 30, resend_limit: 5, reconnect_limit: 5, reconnect_backoff: 2, reconnect_backoff_max: 30, username: nil, password: nil, will_topic: nil, will_payload: nil, will_qos: 0, will_retain: false, ssl: false, verify_host: true }.freeze
Instance Attribute Summary collapse
-
#ack_timeout ⇒ Object
Number of seconds to wait for acknowledgement packets (default is 5 seconds).
-
#clean_session ⇒ Object
Set the ‘Clean Session’ flag when connecting? (default is true).
-
#client_id ⇒ Object
Client Identifier.
-
#connect_timeout ⇒ Object
Number of seconds to connect to the server (default is 90 seconds).
-
#host ⇒ Object
Hostname of the remote server.
-
#keep_alive ⇒ Object
Time (in seconds) between pings to remote server (default is 15 seconds).
-
#password ⇒ Object
Password to authenticate to the server with.
-
#port ⇒ Object
Port number of the remote server.
-
#reconnect_backoff ⇒ Object
How long to wait between re-connection attempts (exponential - i.e. immediately after first drop, then 5s, then 25s, then 125s, etc. when this value defaults to 5).
-
#reconnect_backoff_max ⇒ Object
the longest amount of time to wait before attempting a reconnect.
-
#reconnect_limit ⇒ Object
How many attempts to re-establish a connection after it drops before giving up (default 5); nil for unlimited retries.
-
#resend_limit ⇒ Object
How many times to attempt re-sending packets that weren’t acknowledged (default is 5) before giving up.
-
#ssl ⇒ Object
Set to true to enable SSL/TLS encrypted communication.
-
#username ⇒ Object
Username to authenticate to the server with.
-
#verify_host ⇒ Object
Set to false to skip tls hostname verification.
-
#version ⇒ Object
The version number of the MQTT protocol to use (default 3.1.1).
-
#will_payload ⇒ Object
Contents of message that is sent by server when client disconnect.
-
#will_qos ⇒ Object
The QoS level of the will message sent by the server.
-
#will_retain ⇒ Object
If the Will message should be retain by the server after it is sent.
-
#will_topic ⇒ Object
The topic that the Will message is published to.
Class Method Summary collapse
-
.connect(*args) ⇒ Object
Create and connect a new MQTT Client.
-
.generate_client_id(prefix = "ruby", length = 16) ⇒ Object
Generate a random client identifier (using the characters 0-9 and a-z).
Instance Method Summary collapse
-
#batch_publish ⇒ Object
yields a block, and after the block returns all messages are published at once, waiting for any necessary PubAcks for QoS 1 packets as a batch at the end.
-
#ca_file=(path) ⇒ Object
Set a path to a file containing a PEM-format CA certificate and enable peer verification.
-
#cert=(cert) ⇒ Object
PEM-format client certificate.
-
#cert_file=(path) ⇒ Object
Set a path to a file containing a PEM-format client certificate.
-
#clear_queue ⇒ Object
Clear the incoming message queue.
-
#connect ⇒ Object
Connect to the MQTT server.
-
#connected? ⇒ Boolean
Checks whether the client is connected to the server.
-
#disconnect(send_msg: true) ⇒ Object
Disconnect from the MQTT server.
-
#flush ⇒ Object
wait until all messages have been sent.
-
#get ⇒ Object
Return the next message received from the MQTT server.
-
#initialize(host = nil, port = nil, **attributes) ⇒ Client
constructor
Create a new MQTT Client instance.
-
#key=(*args) ⇒ Object
Set to a PEM-format client private key.
-
#key_file=(*args) ⇒ Object
Set a path to a file containing a PEM-format client private key.
-
#on_reconnect(&block) ⇒ Object
registers a callback to be called when a connection is re-established.
-
#publish(topics, payload = nil, retain: false, qos: 0) ⇒ Object
Publish a message on a particular topic to the MQTT server.
-
#queue_empty? ⇒ Boolean
Returns true if the incoming message queue is empty.
-
#queue_length ⇒ Object
Returns the length of the incoming message queue.
-
#set_will(topic, payload, retain: false, qos: 0) ⇒ Object
Set the Will for the client.
-
#ssl_context ⇒ Object
Get the OpenSSL context, that is used if SSL/TLS is enabled.
-
#subscribe(*topics, wait_for_ack: false) ⇒ Object
Send a subscribe message for one or more topics on the MQTT server.
-
#unsubscribe(*topics, wait_for_ack: false) ⇒ Object
Send a unsubscribe message for one or more topics on the MQTT server.
Constructor Details
#initialize(host = nil, port = nil, **attributes) ⇒ Client
Create a new MQTT Client instance
Accepts one of the following:
-
a URI that uses the MQTT scheme
-
a hostname and port
-
a Hash containing attributes to be set on the new instance
If no arguments are given then the method will look for a URI in the MQTT_SERVER environment variable.
Examples:
client = MQTT::Client.new
client = MQTT::Client.new('mqtt://myserver.example.com')
client = MQTT::Client.new('mqtt://user:[email protected]')
client = MQTT::Client.new('myserver.example.com')
client = MQTT::Client.new('myserver.example.com', 18830)
client = MQTT::Client.new(host: 'myserver.example.com')
client = MQTT::Client.new(host: 'myserver.example.com', keep_alive: 30)
client = MQTT::Client.new(uri: 'mqtt://myserver.example.com', keep_alive: 30)
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/mqtt/client.rb', line 147 def initialize(host = nil, port = nil, **attributes) host = attributes.delete(:uri) if attributes.key?(:uri) # Set server URI from environment if present if host.nil? && port.nil? && attributes.empty? && ENV["MQTT_SERVER"] attributes.merge!(parse_uri(ENV["MQTT_SERVER"])) end if host case host when URI, %r{^mqtts?://} attributes.merge!(parse_uri(host)) else attributes[:host] = host end end attributes[:port] = port unless port.nil? # Merge arguments with default values for attributes ATTR_DEFAULTS.merge(attributes).each_pair do |k, v| send("#{k}=", v) end # Set a default port number if @port.nil? @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT end # Initialise private instance variables @socket = nil @read_queue = Queue.new @write_queue = Queue.new @read_thread = nil @write_thread = nil @acks = {} @connection_mutex = Mutex.new @acks_mutex = Mutex.new @wake_up_pipe = IO.pipe @connected = false @keep_alive_sent = false @last_packet_id = 0 @batch_publish = false end |
Instance Attribute Details
#ack_timeout ⇒ Object
Number of seconds to wait for acknowledgement packets (default is 5 seconds)
42 43 44 |
# File 'lib/mqtt/client.rb', line 42 def ack_timeout @ack_timeout end |
#clean_session ⇒ Object
Set the ‘Clean Session’ flag when connecting? (default is true)
36 37 38 |
# File 'lib/mqtt/client.rb', line 36 def clean_session @clean_session end |
#client_id ⇒ Object
Client Identifier
39 40 41 |
# File 'lib/mqtt/client.rb', line 39 def client_id @client_id end |
#connect_timeout ⇒ Object
Number of seconds to connect to the server (default is 90 seconds)
45 46 47 |
# File 'lib/mqtt/client.rb', line 45 def connect_timeout @connect_timeout end |
#host ⇒ Object
Hostname of the remote server
11 12 13 |
# File 'lib/mqtt/client.rb', line 11 def host @host end |
#keep_alive ⇒ Object
Time (in seconds) between pings to remote server (default is 15 seconds)
33 34 35 |
# File 'lib/mqtt/client.rb', line 33 def keep_alive @keep_alive end |
#password ⇒ Object
Password to authenticate to the server with
67 68 69 |
# File 'lib/mqtt/client.rb', line 67 def password @password end |
#port ⇒ Object
Port number of the remote server
14 15 16 |
# File 'lib/mqtt/client.rb', line 14 def port @port end |
#reconnect_backoff ⇒ Object
How long to wait between re-connection attempts (exponential - i.e. immediately after first drop, then 5s, then 25s, then 125s, etc. when this value defaults to 5)
58 59 60 |
# File 'lib/mqtt/client.rb', line 58 def reconnect_backoff @reconnect_backoff end |
#reconnect_backoff_max ⇒ Object
the longest amount of time to wait before attempting a reconnect
61 62 63 |
# File 'lib/mqtt/client.rb', line 61 def reconnect_backoff_max @reconnect_backoff_max end |
#reconnect_limit ⇒ Object
How many attempts to re-establish a connection after it drops before giving up (default 5); nil for unlimited retries
53 54 55 |
# File 'lib/mqtt/client.rb', line 53 def reconnect_limit @reconnect_limit end |
#resend_limit ⇒ Object
How many times to attempt re-sending packets that weren’t acknowledged (default is 5) before giving up
49 50 51 |
# File 'lib/mqtt/client.rb', line 49 def resend_limit @resend_limit end |
#ssl ⇒ Object
Set to true to enable SSL/TLS encrypted communication
Set to a symbol to use a specific variant of SSL/TLS. Allowed values include:
27 28 29 |
# File 'lib/mqtt/client.rb', line 27 def ssl @ssl end |
#username ⇒ Object
Username to authenticate to the server with
64 65 66 |
# File 'lib/mqtt/client.rb', line 64 def username @username end |
#verify_host ⇒ Object
Set to false to skip tls hostname verification
30 31 32 |
# File 'lib/mqtt/client.rb', line 30 def verify_host @verify_host end |
#version ⇒ Object
The version number of the MQTT protocol to use (default 3.1.1)
17 18 19 |
# File 'lib/mqtt/client.rb', line 17 def version @version end |
#will_payload ⇒ Object
Contents of message that is sent by server when client disconnect
73 74 75 |
# File 'lib/mqtt/client.rb', line 73 def will_payload @will_payload end |
#will_qos ⇒ Object
The QoS level of the will message sent by the server
76 77 78 |
# File 'lib/mqtt/client.rb', line 76 def will_qos @will_qos end |
#will_retain ⇒ Object
If the Will message should be retain by the server after it is sent
79 80 81 |
# File 'lib/mqtt/client.rb', line 79 def will_retain @will_retain end |
#will_topic ⇒ Object
The topic that the Will message is published to
70 71 72 |
# File 'lib/mqtt/client.rb', line 70 def will_topic @will_topic end |
Class Method Details
.connect(*args) ⇒ Object
Create and connect a new MQTT Client
Accepts the same arguments as creating a new client. If a block is given, then it will be executed before disconnecting again.
Example:
MQTT::Client.connect('myserver.example.com') do |client|
# do stuff here
end
115 116 117 118 119 |
# File 'lib/mqtt/client.rb', line 115 def self.connect(*args, &) client = MQTT::Client.new(*args) client.connect(&) client end |
.generate_client_id(prefix = "ruby", length = 16) ⇒ Object
Generate a random client identifier (using the characters 0-9 and a-z)
123 124 125 |
# File 'lib/mqtt/client.rb', line 123 def self.generate_client_id(prefix = "ruby", length = 16) "#{prefix}#{SecureRandom.alphanumeric(length).downcase}" end |
Instance Method Details
#batch_publish ⇒ Object
yields a block, and after the block returns all messages are published at once, waiting for any necessary PubAcks for QoS 1 packets as a batch at the end
For example:
client.batch_publish do
client.publish("topic1", "value1", qos: 1)
client.publish("topic2", "value2", qos: 1)
end
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
# File 'lib/mqtt/client.rb', line 344 def batch_publish return yield if @batch_publish @batch_publish = {} begin yield batch = @batch_publish @batch_publish = nil batch.each do |(kwargs, values)| publish(values, **kwargs) end ensure @batch_publish = nil end end |
#ca_file=(path) ⇒ Object
Set a path to a file containing a PEM-format CA certificate and enable peer verification
224 225 226 227 |
# File 'lib/mqtt/client.rb', line 224 def ca_file=(path) ssl_context.ca_file = path ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless path.nil? end |
#cert=(cert) ⇒ Object
PEM-format client certificate
207 208 209 |
# File 'lib/mqtt/client.rb', line 207 def cert=(cert) ssl_context.cert = OpenSSL::X509::Certificate.new(cert) end |
#cert_file=(path) ⇒ Object
Set a path to a file containing a PEM-format client certificate
202 203 204 |
# File 'lib/mqtt/client.rb', line 202 def cert_file=(path) self.cert = File.read(path) end |
#clear_queue ⇒ Object
Clear the incoming message queue.
492 493 494 |
# File 'lib/mqtt/client.rb', line 492 def clear_queue @read_queue.clear end |
#connect ⇒ Object
Connect to the MQTT server
If a block is given, then yield to that block and then disconnect again.
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/mqtt/client.rb', line 243 def connect if connected? yield(self) if block_given? return end if @client_id.nil? || @client_id.empty? raise "Must provide a client_id if clean_session is set to false" unless @clean_session # Empty client id is not allowed for version 3.1.0 @client_id = MQTT::Client.generate_client_id if @version == "3.1.0" end raise ArgumentError, "No MQTT server host set when attempting to connect" if @host.nil? connect_internal return unless block_given? # If a block is given, then yield and disconnect begin yield(self) ensure disconnect end end |
#connected? ⇒ Boolean
Checks whether the client is connected to the server.
Note that this returns true even if the connection is down and we’re trying to reconnect
323 324 325 |
# File 'lib/mqtt/client.rb', line 323 def connected? @connected end |
#disconnect(send_msg: true) ⇒ Object
Disconnect from the MQTT server.
If you don’t want to say goodbye to the server, set send_msg to false.
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 |
# File 'lib/mqtt/client.rb', line 283 def disconnect(send_msg: true) return unless connected? @read_queue << [ConnectionClosedException.new, current_time] # Stop reading packets from the socket first @connection_mutex.synchronize do if @write_thread&.alive? @write_thread.kill @write_thread.join end @read_thread.kill if @read_thread&.alive? @read_thread = @write_thread = nil @connected = false end @acks_mutex.synchronize do @acks.each_value do |pending_ack| pending_ack.queue << :close end @acks.clear end return unless @socket if send_msg packet = MQTT::Packet::Disconnect.new begin @socket.write(packet.to_s) rescue nil end end @socket.close @socket = nil end |
#flush ⇒ Object
wait until all messages have been sent
271 272 273 274 275 276 277 278 |
# File 'lib/mqtt/client.rb', line 271 def flush raise NotConnectedException unless connected? queue = Queue.new @write_queue << queue queue.pop nil end |
#get ⇒ Object
Return the next message received from the MQTT server.
The method either returns the Publish packet:
packet = client.get
Or can be used with a block to keep processing messages:
client.get do |packet|
# Do stuff here
end
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 |
# File 'lib/mqtt/client.rb', line 458 def get raise NotConnectedException unless connected? loop_start = current_time loop do packet = @read_queue.pop if packet.is_a?(Array) && packet.last >= loop_start e = packet.first e.set_backtrace((e.backtrace || []) + ["<from MQTT worker thread>"] + caller) raise e end next unless packet.is_a?(Packet) unless block_given? puback_packet(packet) if packet.qos.positive? return packet end yield packet puback_packet(packet) if packet.qos.positive? end end |
#key=(*args) ⇒ Object
Set to a PEM-format client private key
218 219 220 221 |
# File 'lib/mqtt/client.rb', line 218 def key=(*args) cert, passphrase = args.flatten ssl_context.key = OpenSSL::PKey.read(cert, passphrase) end |
#key_file=(*args) ⇒ Object
Set a path to a file containing a PEM-format client private key
212 213 214 215 |
# File 'lib/mqtt/client.rb', line 212 def key_file=(*args) path, passphrase = args.flatten ssl_context.key = OpenSSL::PKey.read(File.binread(path), passphrase) end |
#on_reconnect(&block) ⇒ Object
registers a callback to be called when a connection is re-established
can be used to re-subscribe (if you’re not using persistent sessions) to topics, and/or re-publish aliveness (if you set a Will)
331 332 333 |
# File 'lib/mqtt/client.rb', line 331 def on_reconnect(&block) @on_reconnect = block end |
#publish(topics, payload = nil, retain: false, qos: 0) ⇒ Object
Publish a message on a particular topic to the MQTT server.
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 |
# File 'lib/mqtt/client.rb', line 363 def publish(topics, payload = nil, retain: false, qos: 0) if topics.is_a?(Hash) && !payload.nil? raise ArgumentError, "Payload cannot be passed if passing a hash for topics and payloads" end raise NotConnectedException unless connected? if @batch_publish && qos != 0 values = @batch_publish[{ retain:, qos: }] ||= {} if topics.is_a?(Hash) values.merge!(topics) else values[topics] = payload end return end pending_acks = [] topics = { topics => payload } unless topics.is_a?(Hash) topics.each do |(topic, topic_payload)| raise ArgumentError, "Topic name cannot be nil" if topic.nil? raise ArgumentError, "Topic name cannot be empty" if topic.empty? packet = MQTT::Packet::Publish.new( id: next_packet_id, qos:, retain:, topic:, payload: topic_payload ) pending_acks << register_for_ack(packet) unless qos.zero? # Send the packet send_packet(packet) end return if qos.zero? pending_acks.each do |ack| wait_for_ack(ack) end nil end |
#queue_empty? ⇒ Boolean
Returns true if the incoming message queue is empty.
482 483 484 |
# File 'lib/mqtt/client.rb', line 482 def queue_empty? @read_queue.empty? end |
#queue_length ⇒ Object
Returns the length of the incoming message queue.
487 488 489 |
# File 'lib/mqtt/client.rb', line 487 def queue_length @read_queue.length end |
#set_will(topic, payload, retain: false, qos: 0) ⇒ Object
Set the Will for the client
The will is a message that will be delivered by the server when the client dies. The Will must be set before establishing a connection to the server
233 234 235 236 237 238 |
# File 'lib/mqtt/client.rb', line 233 def set_will(topic, payload, retain: false, qos: 0) self.will_topic = topic self.will_payload = payload self.will_retain = retain self.will_qos = qos end |
#ssl_context ⇒ Object
Get the OpenSSL context, that is used if SSL/TLS is enabled
197 198 199 |
# File 'lib/mqtt/client.rb', line 197 def ssl_context @ssl_context ||= OpenSSL::SSL::SSLContext.new end |
#subscribe(*topics, wait_for_ack: false) ⇒ Object
Send a subscribe message for one or more topics on the MQTT server. The topics parameter should be one of the following:
-
String: subscribe to one topic with QoS 0
-
Array: subscribe to multiple topics with QoS 0
-
Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level
For example:
client.subscribe( 'a/b' )
client.subscribe( 'a/b', 'c/d' )
client.subscribe( ['a/b',0], ['c/d',1] )
client.subscribe( { 'a/b' => 0, 'c/d' => 1 } )
421 422 423 424 425 426 427 428 429 430 431 |
# File 'lib/mqtt/client.rb', line 421 def subscribe(*topics, wait_for_ack: false) raise NotConnectedException unless connected? packet = MQTT::Packet::Subscribe.new( id: next_packet_id, topics: ) token = register_for_ack(packet) if wait_for_ack send_packet(packet) wait_for_ack(token) if wait_for_ack end |
#unsubscribe(*topics, wait_for_ack: false) ⇒ Object
Send a unsubscribe message for one or more topics on the MQTT server
434 435 436 437 438 439 440 441 442 443 444 445 446 |
# File 'lib/mqtt/client.rb', line 434 def unsubscribe(*topics, wait_for_ack: false) raise NotConnectedException unless connected? topics = topics.first if topics.is_a?(Enumerable) && topics.one? packet = MQTT::Packet::Unsubscribe.new( topics:, id: next_packet_id ) token = register_for_ack(packet) if wait_for_ack send_packet(packet) wait_for_ack(token) if wait_for_ack end |