Class: MQTT::Client
- Inherits:
-
Object
- Object
- MQTT::Client
- Defined in:
- lib/mqtt/client.rb
Defined Under Namespace
Classes: PendingAck
Constant Summary collapse
- ATTR_DEFAULTS =
Default attribute values
{ host: nil, port: nil, version: '3.1.1', keep_alive: 15, clean_session: true, client_id: nil, ack_timeout: 5, resend_limit: 5, reconnect_limit: 5, reconnect_backoff: 2, reconnect_backoff_max: 30, username: nil, password: nil, will_topic: nil, will_payload: nil, will_qos: 0, will_retain: false, ssl: false }.freeze
Instance Attribute Summary collapse
-
#ack_timeout ⇒ Object
Number of seconds to wait for acknowledgement packets (default is 5 seconds).
-
#clean_session ⇒ Object
Set the ‘Clean Session’ flag when connecting? (default is true).
-
#client_id ⇒ Object
Client Identifier.
-
#host ⇒ Object
Hostname of the remote server.
-
#keep_alive ⇒ Object
Time (in seconds) between pings to remote server (default is 15 seconds).
-
#password ⇒ Object
Password to authenticate to the server with.
-
#port ⇒ Object
Port number of the remote server.
-
#reconnect_backoff ⇒ Object
How long to wait between re-connection attempts (exponential - i.e. immediately after first drop, then 5s, then 25s, then 125s, etc. when this value defaults to 5).
-
#reconnect_backoff_max ⇒ Object
the longest amount of time to wait before attempting a reconnect.
-
#reconnect_limit ⇒ Object
How many attempts to re-establish a connection after it drops before giving up (default 5); nil for unlimited retries.
-
#resend_limit ⇒ Object
How many times to attempt re-sending packets that weren’t acknowledged (default is 5) before giving up.
-
#ssl ⇒ Object
Set to true to enable SSL/TLS encrypted communication.
-
#username ⇒ Object
Username to authenticate to the server with.
-
#version ⇒ Object
The version number of the MQTT protocol to use (default 3.1.1).
-
#will_payload ⇒ Object
Contents of message that is sent by server when client disconnect.
-
#will_qos ⇒ Object
The QoS level of the will message sent by the server.
-
#will_retain ⇒ Object
If the Will message should be retain by the server after it is sent.
-
#will_topic ⇒ Object
The topic that the Will message is published to.
Class Method Summary collapse
-
.connect(*args, &block) ⇒ Object
Create and connect a new MQTT Client.
-
.generate_client_id(prefix = 'ruby', length = 16) ⇒ Object
Generate a random client identifier (using the characters 0-9 and a-z).
Instance Method Summary collapse
-
#batch_publish ⇒ Object
yields a block, and after the block returns all messages are published at once, waiting for any necessary PubAcks for QoS 1 packets as a batch at the end.
-
#ca_file=(path) ⇒ Object
Set a path to a file containing a PEM-format CA certificate and enable peer verification.
-
#cert=(cert) ⇒ Object
PEM-format client certificate.
-
#cert_file=(path) ⇒ Object
Set a path to a file containing a PEM-format client certificate.
-
#clear_queue ⇒ Object
Clear the incoming message queue.
-
#connect ⇒ Object
Connect to the MQTT server.
-
#connected? ⇒ Boolean
Checks whether the client is connected to the server.
-
#disconnect(send_msg: true) ⇒ Object
Disconnect from the MQTT server.
-
#flush ⇒ Object
wait until all messages have been sent.
-
#get ⇒ Object
Return the next message received from the MQTT server.
-
#initialize(host = nil, port = nil, **attributes) ⇒ Client
constructor
Create a new MQTT Client instance.
-
#key=(*args) ⇒ Object
Set to a PEM-format client private key.
-
#key_file=(*args) ⇒ Object
Set a path to a file containing a PEM-format client private key.
-
#on_reconnect(&block) ⇒ Object
registers a callback to be called when a connection is re-established.
-
#publish(topics, payload = nil, retain: false, qos: 0) ⇒ Object
Publish a message on a particular topic to the MQTT server.
-
#queue_empty? ⇒ Boolean
Returns true if the incoming message queue is empty.
-
#queue_length ⇒ Object
Returns the length of the incoming message queue.
-
#set_will(topic, payload, retain: false, qos: 0) ⇒ Object
Set the Will for the client.
-
#ssl_context ⇒ Object
Get the OpenSSL context, that is used if SSL/TLS is enabled.
-
#subscribe(*topics, wait_for_ack: false) ⇒ Object
Send a subscribe message for one or more topics on the MQTT server.
-
#unsubscribe(*topics, wait_for_ack: false) ⇒ Object
Send a unsubscribe message for one or more topics on the MQTT server.
Constructor Details
#initialize(host = nil, port = nil, **attributes) ⇒ Client
Create a new MQTT Client instance
Accepts one of the following:
-
a URI that uses the MQTT scheme
-
a hostname and port
-
a Hash containing attributes to be set on the new instance
If no arguments are given then the method will look for a URI in the MQTT_SERVER environment variable.
Examples:
client = MQTT::Client.new
client = MQTT::Client.new('mqtt://myserver.example.com')
client = MQTT::Client.new('mqtt://user:[email protected]')
client = MQTT::Client.new('myserver.example.com')
client = MQTT::Client.new('myserver.example.com', 18830)
client = MQTT::Client.new(host: 'myserver.example.com')
client = MQTT::Client.new(host: 'myserver.example.com', keep_alive: 30)
client = MQTT::Client.new(uri: 'mqtt://myserver.example.com', keep_alive: 30)
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/mqtt/client.rb', line 139 def initialize(host = nil, port = nil, **attributes) host = attributes.delete(:uri) if attributes.key?(:uri) # Set server URI from environment if present if host.nil? && port.nil? && attributes.empty? && ENV['MQTT_SERVER'] attributes.merge!(parse_uri(ENV['MQTT_SERVER'])) end if host case host when URI, %r{^mqtts?://} attributes.merge!(parse_uri(host)) else attributes[:host] = host end end attributes[:port] = port unless port.nil? # Merge arguments with default values for attributes ATTR_DEFAULTS.merge(attributes).each_pair do |k, v| send("#{k}=", v) end # Set a default port number if @port.nil? @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT end # Initialise private instance variables @socket = nil @read_queue = Queue.new @write_queue = Queue.new @read_thread = nil @write_thread = nil @acks = {} @connection_mutex = Mutex.new @acks_mutex = Mutex.new @wake_up_pipe = IO.pipe @connected = false @keep_alive_sent = false @last_packet_id = 0 @batch_publish = false end |
Instance Attribute Details
#ack_timeout ⇒ Object
Number of seconds to wait for acknowledgement packets (default is 5 seconds)
39 40 41 |
# File 'lib/mqtt/client.rb', line 39 def ack_timeout @ack_timeout end |
#clean_session ⇒ Object
Set the ‘Clean Session’ flag when connecting? (default is true)
33 34 35 |
# File 'lib/mqtt/client.rb', line 33 def clean_session @clean_session end |
#client_id ⇒ Object
Client Identifier
36 37 38 |
# File 'lib/mqtt/client.rb', line 36 def client_id @client_id end |
#host ⇒ Object
Hostname of the remote server
11 12 13 |
# File 'lib/mqtt/client.rb', line 11 def host @host end |
#keep_alive ⇒ Object
Time (in seconds) between pings to remote server (default is 15 seconds)
30 31 32 |
# File 'lib/mqtt/client.rb', line 30 def keep_alive @keep_alive end |
#password ⇒ Object
Password to authenticate to the server with
61 62 63 |
# File 'lib/mqtt/client.rb', line 61 def password @password end |
#port ⇒ Object
Port number of the remote server
14 15 16 |
# File 'lib/mqtt/client.rb', line 14 def port @port end |
#reconnect_backoff ⇒ Object
How long to wait between re-connection attempts (exponential - i.e. immediately after first drop, then 5s, then 25s, then 125s, etc. when this value defaults to 5)
52 53 54 |
# File 'lib/mqtt/client.rb', line 52 def reconnect_backoff @reconnect_backoff end |
#reconnect_backoff_max ⇒ Object
the longest amount of time to wait before attempting a reconnect
55 56 57 |
# File 'lib/mqtt/client.rb', line 55 def reconnect_backoff_max @reconnect_backoff_max end |
#reconnect_limit ⇒ Object
How many attempts to re-establish a connection after it drops before giving up (default 5); nil for unlimited retries
47 48 49 |
# File 'lib/mqtt/client.rb', line 47 def reconnect_limit @reconnect_limit end |
#resend_limit ⇒ Object
How many times to attempt re-sending packets that weren’t acknowledged (default is 5) before giving up
43 44 45 |
# File 'lib/mqtt/client.rb', line 43 def resend_limit @resend_limit end |
#ssl ⇒ Object
Set to true to enable SSL/TLS encrypted communication
Set to a symbol to use a specific variant of SSL/TLS. Allowed values include:
27 28 29 |
# File 'lib/mqtt/client.rb', line 27 def ssl @ssl end |
#username ⇒ Object
Username to authenticate to the server with
58 59 60 |
# File 'lib/mqtt/client.rb', line 58 def username @username end |
#version ⇒ Object
The version number of the MQTT protocol to use (default 3.1.1)
17 18 19 |
# File 'lib/mqtt/client.rb', line 17 def version @version end |
#will_payload ⇒ Object
Contents of message that is sent by server when client disconnect
67 68 69 |
# File 'lib/mqtt/client.rb', line 67 def will_payload @will_payload end |
#will_qos ⇒ Object
The QoS level of the will message sent by the server
70 71 72 |
# File 'lib/mqtt/client.rb', line 70 def will_qos @will_qos end |
#will_retain ⇒ Object
If the Will message should be retain by the server after it is sent
73 74 75 |
# File 'lib/mqtt/client.rb', line 73 def will_retain @will_retain end |
#will_topic ⇒ Object
The topic that the Will message is published to
64 65 66 |
# File 'lib/mqtt/client.rb', line 64 def will_topic @will_topic end |
Class Method Details
.connect(*args, &block) ⇒ Object
107 108 109 110 111 |
# File 'lib/mqtt/client.rb', line 107 def self.connect(*args, &block) client = MQTT::Client.new(*args) client.connect(&block) client end |
.generate_client_id(prefix = 'ruby', length = 16) ⇒ Object
Generate a random client identifier (using the characters 0-9 and a-z)
115 116 117 |
# File 'lib/mqtt/client.rb', line 115 def self.generate_client_id(prefix = 'ruby', length = 16) "#{prefix}#{SecureRandom.alphanumeric(length).downcase}" end |
Instance Method Details
#batch_publish ⇒ Object
yields a block, and after the block returns all messages are published at once, waiting for any necessary PubAcks for QoS 1 packets as a batch at the end
For example:
client.batch_publish do
client.publish("topic1", "value1", qos: 1)
client.publish("topic2", "value2", qos: 1)
end
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
# File 'lib/mqtt/client.rb', line 336 def batch_publish return yield if @batch_publish @batch_publish = {} begin yield batch = @batch_publish @batch_publish = nil batch.each do |(kwargs, values)| publish(values, **kwargs) end ensure @batch_publish = nil end end |
#ca_file=(path) ⇒ Object
Set a path to a file containing a PEM-format CA certificate and enable peer verification
216 217 218 219 |
# File 'lib/mqtt/client.rb', line 216 def ca_file=(path) ssl_context.ca_file = path ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless path.nil? end |
#cert=(cert) ⇒ Object
PEM-format client certificate
199 200 201 |
# File 'lib/mqtt/client.rb', line 199 def cert=(cert) ssl_context.cert = OpenSSL::X509::Certificate.new(cert) end |
#cert_file=(path) ⇒ Object
Set a path to a file containing a PEM-format client certificate
194 195 196 |
# File 'lib/mqtt/client.rb', line 194 def cert_file=(path) self.cert = File.read(path) end |
#clear_queue ⇒ Object
Clear the incoming message queue.
484 485 486 |
# File 'lib/mqtt/client.rb', line 484 def clear_queue @read_queue.clear end |
#connect ⇒ Object
Connect to the MQTT server
If a block is given, then yield to that block and then disconnect again.
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/mqtt/client.rb', line 235 def connect if connected? yield(self) if block_given? return end if @client_id.nil? || @client_id.empty? raise 'Must provide a client_id if clean_session is set to false' unless @clean_session # Empty client id is not allowed for version 3.1.0 @client_id = MQTT::Client.generate_client_id if @version == '3.1.0' end raise ArgumentError, 'No MQTT server host set when attempting to connect' if @host.nil? connect_internal return unless block_given? # If a block is given, then yield and disconnect begin yield(self) ensure disconnect end end |
#connected? ⇒ Boolean
Checks whether the client is connected to the server.
Note that this returns true even if the connection is down and we’re trying to reconnect
315 316 317 |
# File 'lib/mqtt/client.rb', line 315 def connected? @connected end |
#disconnect(send_msg: true) ⇒ Object
Disconnect from the MQTT server.
If you don’t want to say goodbye to the server, set send_msg to false.
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/mqtt/client.rb', line 275 def disconnect(send_msg: true) return unless connected? @read_queue << [ConnectionClosedException.new, current_time] # Stop reading packets from the socket first @connection_mutex.synchronize do if @write_thread&.alive? @write_thread.kill @write_thread.join end @read_thread.kill if @read_thread&.alive? @read_thread = @write_thread = nil @connected = false end @acks_mutex.synchronize do @acks.each_value do |pending_ack| pending_ack.queue << :close end @acks.clear end return unless @socket if send_msg packet = MQTT::Packet::Disconnect.new begin @socket.write(packet.to_s) rescue nil end end @socket.close @socket = nil end |
#flush ⇒ Object
wait until all messages have been sent
263 264 265 266 267 268 269 270 |
# File 'lib/mqtt/client.rb', line 263 def flush raise NotConnectedException unless connected? queue = Queue.new @write_queue << queue queue.pop nil end |
#get ⇒ Object
Return the next message received from the MQTT server.
The method either returns the Publish packet:
packet = client.get
Or can be used with a block to keep processing messages:
client.get do |packet|
# Do stuff here
end
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 |
# File 'lib/mqtt/client.rb', line 450 def get raise NotConnectedException unless connected? loop_start = current_time loop do packet = @read_queue.pop if packet.is_a?(Array) && packet.last >= loop_start e = packet.first e.set_backtrace((e.backtrace || []) + ['<from MQTT worker thread>'] + caller) raise e end next unless packet.is_a?(Packet) unless block_given? puback_packet(packet) if packet.qos > 0 return packet end yield packet puback_packet(packet) if packet.qos > 0 end end |
#key=(*args) ⇒ Object
Set to a PEM-format client private key
210 211 212 213 |
# File 'lib/mqtt/client.rb', line 210 def key=(*args) cert, passphrase = args.flatten ssl_context.key = OpenSSL::PKey::RSA.new(cert, passphrase) end |
#key_file=(*args) ⇒ Object
Set a path to a file containing a PEM-format client private key
204 205 206 207 |
# File 'lib/mqtt/client.rb', line 204 def key_file=(*args) path, passphrase = args.flatten ssl_context.key = OpenSSL::PKey::RSA.new(File.open(path), passphrase) end |
#on_reconnect(&block) ⇒ Object
registers a callback to be called when a connection is re-established
can be used to re-subscribe (if you’re not using persistent sessions) to topics, and/or re-publish aliveness (if you set a Will)
323 324 325 |
# File 'lib/mqtt/client.rb', line 323 def on_reconnect(&block) @on_reconnect = block end |
#publish(topics, payload = nil, retain: false, qos: 0) ⇒ Object
Publish a message on a particular topic to the MQTT server.
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 |
# File 'lib/mqtt/client.rb', line 355 def publish(topics, payload = nil, retain: false, qos: 0) if topics.is_a?(Hash) && !payload.nil? raise ArgumentError, 'Payload cannot be passed if passing a hash for topics and payloads' end raise NotConnectedException unless connected? if @batch_publish && qos != 0 values = @batch_publish[{ retain: retain, qos: qos }] ||= {} if topics.is_a?(Hash) values.merge!(topics) else values[topics] = payload end return end pending_acks = [] topics = { topics => payload } unless topics.is_a?(Hash) topics.each do |(topic, topic_payload)| raise ArgumentError, 'Topic name cannot be nil' if topic.nil? raise ArgumentError, 'Topic name cannot be empty' if topic.empty? packet = MQTT::Packet::Publish.new( id: next_packet_id, qos: qos, retain: retain, topic: topic, payload: topic_payload ) pending_acks << register_for_ack(packet) unless qos.zero? # Send the packet send_packet(packet) end return if qos.zero? pending_acks.each do |ack| wait_for_ack(ack) end nil end |
#queue_empty? ⇒ Boolean
Returns true if the incoming message queue is empty.
474 475 476 |
# File 'lib/mqtt/client.rb', line 474 def queue_empty? @read_queue.empty? end |
#queue_length ⇒ Object
Returns the length of the incoming message queue.
479 480 481 |
# File 'lib/mqtt/client.rb', line 479 def queue_length @read_queue.length end |
#set_will(topic, payload, retain: false, qos: 0) ⇒ Object
Set the Will for the client
The will is a message that will be delivered by the server when the client dies. The Will must be set before establishing a connection to the server
225 226 227 228 229 230 |
# File 'lib/mqtt/client.rb', line 225 def set_will(topic, payload, retain: false, qos: 0) self.will_topic = topic self.will_payload = payload self.will_retain = retain self.will_qos = qos end |
#ssl_context ⇒ Object
Get the OpenSSL context, that is used if SSL/TLS is enabled
189 190 191 |
# File 'lib/mqtt/client.rb', line 189 def ssl_context @ssl_context ||= OpenSSL::SSL::SSLContext.new end |
#subscribe(*topics, wait_for_ack: false) ⇒ Object
Send a subscribe message for one or more topics on the MQTT server. The topics parameter should be one of the following:
-
String: subscribe to one topic with QoS 0
-
Array: subscribe to multiple topics with QoS 0
-
Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level
For example:
client.subscribe( 'a/b' )
client.subscribe( 'a/b', 'c/d' )
client.subscribe( ['a/b',0], ['c/d',1] )
client.subscribe( { 'a/b' => 0, 'c/d' => 1 } )
413 414 415 416 417 418 419 420 421 422 423 |
# File 'lib/mqtt/client.rb', line 413 def subscribe(*topics, wait_for_ack: false) raise NotConnectedException unless connected? packet = MQTT::Packet::Subscribe.new( id: next_packet_id, topics: topics ) token = register_for_ack(packet) if wait_for_ack send_packet(packet) wait_for_ack(token) if wait_for_ack end |
#unsubscribe(*topics, wait_for_ack: false) ⇒ Object
Send a unsubscribe message for one or more topics on the MQTT server
426 427 428 429 430 431 432 433 434 435 436 437 438 |
# File 'lib/mqtt/client.rb', line 426 def unsubscribe(*topics, wait_for_ack: false) raise NotConnectedException unless connected? topics = topics.first if topics.is_a?(Enumerable) && topics.count == 1 packet = MQTT::Packet::Unsubscribe.new( topics: topics, id: next_packet_id ) token = register_for_ack(packet) if wait_for_ack send_packet(packet) wait_for_ack(token) if wait_for_ack end |