Class: MQTT::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/mqtt/client.rb

Constant Summary collapse

ATTR_DEFAULTS =

Default attribute values

{
  host: nil,
  port: nil,
  version: "3.1.1",
  keep_alive: 15,
  clean_session: true,
  client_id: nil,
  ack_timeout: 5,
  connect_timeout: 30,
  resend_limit: 5,
  reconnect_limit: 5,
  reconnect_backoff: 2,
  reconnect_backoff_max: 30,
  username: nil,
  password: nil,
  will_topic: nil,
  will_payload: nil,
  will_qos: 0,
  will_retain: false,
  ssl: false,
  verify_host: true
}.freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host = nil, port = nil, **attributes) ⇒ Client

Create a new MQTT Client instance

Accepts one of the following:

  • a URI that uses the MQTT scheme

  • a hostname and port

  • a Hash containing attributes to be set on the new instance

If no arguments are given then the method will look for a URI in the MQTT_SERVER environment variable.

Examples:

client = MQTT::Client.new
client = MQTT::Client.new('mqtt://myserver.example.com')
client = MQTT::Client.new('mqtt://user:[email protected]')
client = MQTT::Client.new('myserver.example.com')
client = MQTT::Client.new('myserver.example.com', 18830)
client = MQTT::Client.new(host: 'myserver.example.com')
client = MQTT::Client.new(host: 'myserver.example.com', keep_alive: 30)
client = MQTT::Client.new(uri: 'mqtt://myserver.example.com', keep_alive: 30)


147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/mqtt/client.rb', line 147

def initialize(host = nil, port = nil, **attributes)
  host = attributes.delete(:uri) if attributes.key?(:uri)

  # Set server URI from environment if present
  if host.nil? && port.nil? && attributes.empty? && ENV["MQTT_SERVER"]
    attributes.merge!(parse_uri(ENV["MQTT_SERVER"]))
  end

  if host
    case host
    when URI, %r{^mqtts?://}
      attributes.merge!(parse_uri(host))
    else
      attributes[:host] = host
    end
  end

  attributes[:port] = port unless port.nil?

  # Merge arguments with default values for attributes
  ATTR_DEFAULTS.merge(attributes).each_pair do |k, v|
    send("#{k}=", v)
  end

  # Set a default port number
  if @port.nil?
    @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT
  end

  # Initialise private instance variables
  @socket = nil
  @read_queue = Queue.new
  @write_queue = Queue.new

  @read_thread = nil
  @write_thread = nil

  @acks = {}

  @connection_mutex = Mutex.new
  @acks_mutex = Mutex.new
  @wake_up_pipe = IO.pipe

  @connected = false
  @keep_alive_sent = false
  @last_packet_id = 0
  @batch_publish = false
end

Instance Attribute Details

#ack_timeoutObject

Number of seconds to wait for acknowledgement packets (default is 5 seconds)



42
43
44
# File 'lib/mqtt/client.rb', line 42

def ack_timeout
  @ack_timeout
end

#clean_sessionObject

Set the ‘Clean Session’ flag when connecting? (default is true)



36
37
38
# File 'lib/mqtt/client.rb', line 36

def clean_session
  @clean_session
end

#client_idObject

Client Identifier



39
40
41
# File 'lib/mqtt/client.rb', line 39

def client_id
  @client_id
end

#connect_timeoutObject

Number of seconds to connect to the server (default is 90 seconds)



45
46
47
# File 'lib/mqtt/client.rb', line 45

def connect_timeout
  @connect_timeout
end

#hostObject

Hostname of the remote server



11
12
13
# File 'lib/mqtt/client.rb', line 11

def host
  @host
end

#keep_aliveObject

Time (in seconds) between pings to remote server (default is 15 seconds)



33
34
35
# File 'lib/mqtt/client.rb', line 33

def keep_alive
  @keep_alive
end

#passwordObject

Password to authenticate to the server with



67
68
69
# File 'lib/mqtt/client.rb', line 67

def password
  @password
end

#portObject

Port number of the remote server



14
15
16
# File 'lib/mqtt/client.rb', line 14

def port
  @port
end

#reconnect_backoffObject

How long to wait between re-connection attempts (exponential - i.e. immediately after first drop, then 5s, then 25s, then 125s, etc. when this value defaults to 5)



58
59
60
# File 'lib/mqtt/client.rb', line 58

def reconnect_backoff
  @reconnect_backoff
end

#reconnect_backoff_maxObject

the longest amount of time to wait before attempting a reconnect



61
62
63
# File 'lib/mqtt/client.rb', line 61

def reconnect_backoff_max
  @reconnect_backoff_max
end

#reconnect_limitObject

How many attempts to re-establish a connection after it drops before giving up (default 5); nil for unlimited retries



53
54
55
# File 'lib/mqtt/client.rb', line 53

def reconnect_limit
  @reconnect_limit
end

#resend_limitObject

How many times to attempt re-sending packets that weren’t acknowledged (default is 5) before giving up



49
50
51
# File 'lib/mqtt/client.rb', line 49

def resend_limit
  @resend_limit
end

#sslObject

Set to true to enable SSL/TLS encrypted communication

Set to a symbol to use a specific variant of SSL/TLS. Allowed values include:

Examples:

Using TLS 1.0

client = Client.new('mqtt.example.com', ssl: :TLSv1)

See Also:

  • OpenSSL::SSL::SSLContext::METHODS


27
28
29
# File 'lib/mqtt/client.rb', line 27

def ssl
  @ssl
end

#usernameObject

Username to authenticate to the server with



64
65
66
# File 'lib/mqtt/client.rb', line 64

def username
  @username
end

#verify_hostObject

Set to false to skip tls hostname verification



30
31
32
# File 'lib/mqtt/client.rb', line 30

def verify_host
  @verify_host
end

#versionObject

The version number of the MQTT protocol to use (default 3.1.1)



17
18
19
# File 'lib/mqtt/client.rb', line 17

def version
  @version
end

#will_payloadObject

Contents of message that is sent by server when client disconnect



73
74
75
# File 'lib/mqtt/client.rb', line 73

def will_payload
  @will_payload
end

#will_qosObject

The QoS level of the will message sent by the server



76
77
78
# File 'lib/mqtt/client.rb', line 76

def will_qos
  @will_qos
end

#will_retainObject

If the Will message should be retain by the server after it is sent



79
80
81
# File 'lib/mqtt/client.rb', line 79

def will_retain
  @will_retain
end

#will_topicObject

The topic that the Will message is published to



70
71
72
# File 'lib/mqtt/client.rb', line 70

def will_topic
  @will_topic
end

Class Method Details

.connect(*args) ⇒ Object

Create and connect a new MQTT Client

Accepts the same arguments as creating a new client. If a block is given, then it will be executed before disconnecting again.

Example:

MQTT::Client.connect('myserver.example.com') do |client|
  # do stuff here
end


115
116
117
118
119
# File 'lib/mqtt/client.rb', line 115

def self.connect(*args, &)
  client = MQTT::Client.new(*args)
  client.connect(&)
  client
end

.generate_client_id(prefix = "ruby", length = 16) ⇒ Object

Generate a random client identifier (using the characters 0-9 and a-z)



123
124
125
# File 'lib/mqtt/client.rb', line 123

def self.generate_client_id(prefix = "ruby", length = 16)
  "#{prefix}#{SecureRandom.alphanumeric(length).downcase}"
end

Instance Method Details

#batch_publishObject

yields a block, and after the block returns all messages are published at once, waiting for any necessary PubAcks for QoS 1 packets as a batch at the end

For example:
  client.batch_publish do
    client.publish("topic1", "value1", qos: 1)
    client.publish("topic2", "value2", qos: 1)
  end


344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# File 'lib/mqtt/client.rb', line 344

def batch_publish
  return yield if @batch_publish

  @batch_publish = {}

  begin
    yield

    batch = @batch_publish
    @batch_publish = nil
    batch.each do |(kwargs, values)|
      publish(values, **kwargs)
    end
  ensure
    @batch_publish = nil
  end
end

#ca_file=(path) ⇒ Object

Set a path to a file containing a PEM-format CA certificate and enable peer verification



224
225
226
227
# File 'lib/mqtt/client.rb', line 224

def ca_file=(path)
  ssl_context.ca_file = path
  ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless path.nil?
end

#cert=(cert) ⇒ Object

PEM-format client certificate



207
208
209
# File 'lib/mqtt/client.rb', line 207

def cert=(cert)
  ssl_context.cert = OpenSSL::X509::Certificate.new(cert)
end

#cert_file=(path) ⇒ Object

Set a path to a file containing a PEM-format client certificate



202
203
204
# File 'lib/mqtt/client.rb', line 202

def cert_file=(path)
  self.cert = File.read(path)
end

#clear_queueObject

Clear the incoming message queue.



492
493
494
# File 'lib/mqtt/client.rb', line 492

def clear_queue
  @read_queue.clear
end

#connectObject

Connect to the MQTT server

If a block is given, then yield to that block and then disconnect again.

Raises:

  • (ArgumentError)


243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/mqtt/client.rb', line 243

def connect
  if connected?
    yield(self) if block_given?
    return
  end

  if @client_id.nil? || @client_id.empty?
    raise "Must provide a client_id if clean_session is set to false" unless @clean_session

    # Empty client id is not allowed for version 3.1.0
    @client_id = MQTT::Client.generate_client_id if @version == "3.1.0"
  end

  raise ArgumentError, "No MQTT server host set when attempting to connect" if @host.nil?

  connect_internal

  return unless block_given?

  # If a block is given, then yield and disconnect
  begin
    yield(self)
  ensure
    disconnect
  end
end

#connected?Boolean

Checks whether the client is connected to the server.

Note that this returns true even if the connection is down and we’re trying to reconnect

Returns:

  • (Boolean)


323
324
325
# File 'lib/mqtt/client.rb', line 323

def connected?
  @connected
end

#disconnect(send_msg: true) ⇒ Object

Disconnect from the MQTT server.

If you don’t want to say goodbye to the server, set send_msg to false.



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/mqtt/client.rb', line 283

def disconnect(send_msg: true)
  return unless connected?

  @read_queue << [ConnectionClosedException.new, current_time]
  # Stop reading packets from the socket first
  @connection_mutex.synchronize do
    if @write_thread&.alive?
      @write_thread.kill
      @write_thread.join
    end
    @read_thread.kill if @read_thread&.alive?
    @read_thread = @write_thread = nil

    @connected = false
  end
  @acks_mutex.synchronize do
    @acks.each_value do |pending_ack|
      pending_ack.queue << :close
    end
    @acks.clear
  end

  return unless @socket

  if send_msg
    packet = MQTT::Packet::Disconnect.new
    begin
      @socket.write(packet.to_s)
    rescue
      nil
    end
  end
  @socket.close
  @socket = nil
end

#flushObject

wait until all messages have been sent



271
272
273
274
275
276
277
278
# File 'lib/mqtt/client.rb', line 271

def flush
  raise NotConnectedException unless connected?

  queue = Queue.new
  @write_queue << queue
  queue.pop
  nil
end

#getObject

Return the next message received from the MQTT server.

The method either returns the Publish packet:

packet = client.get

Or can be used with a block to keep processing messages:

client.get do |packet|
  # Do stuff here
end


458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
# File 'lib/mqtt/client.rb', line 458

def get
  raise NotConnectedException unless connected?

  loop_start = current_time
  loop do
    packet = @read_queue.pop
    if packet.is_a?(Array) && packet.last >= loop_start
      e = packet.first
      e.set_backtrace((e.backtrace || []) + ["<from MQTT worker thread>"] + caller)
      raise e
    end
    next unless packet.is_a?(Packet)

    unless block_given?
      puback_packet(packet) if packet.qos.positive?
      return packet
    end

    yield packet
    puback_packet(packet) if packet.qos.positive?
  end
end

#key=(*args) ⇒ Object

Set to a PEM-format client private key



218
219
220
221
# File 'lib/mqtt/client.rb', line 218

def key=(*args)
  cert, passphrase = args.flatten
  ssl_context.key = OpenSSL::PKey.read(cert, passphrase)
end

#key_file=(*args) ⇒ Object

Set a path to a file containing a PEM-format client private key



212
213
214
215
# File 'lib/mqtt/client.rb', line 212

def key_file=(*args)
  path, passphrase = args.flatten
  ssl_context.key = OpenSSL::PKey.read(File.binread(path), passphrase)
end

#on_reconnect(&block) ⇒ Object

registers a callback to be called when a connection is re-established

can be used to re-subscribe (if you’re not using persistent sessions) to topics, and/or re-publish aliveness (if you set a Will)



331
332
333
# File 'lib/mqtt/client.rb', line 331

def on_reconnect(&block)
  @on_reconnect = block
end

#publish(topics, payload = nil, retain: false, qos: 0) ⇒ Object

Publish a message on a particular topic to the MQTT server.



363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# File 'lib/mqtt/client.rb', line 363

def publish(topics, payload = nil, retain: false, qos: 0)
  if topics.is_a?(Hash) && !payload.nil?
    raise ArgumentError, "Payload cannot be passed if passing a hash for topics and payloads"
  end
  raise NotConnectedException unless connected?

  if @batch_publish && qos != 0
    values = @batch_publish[{ retain:, qos: }] ||= {}
    if topics.is_a?(Hash)
      values.merge!(topics)
    else
      values[topics] = payload
    end
    return
  end

  pending_acks = []

  topics = { topics => payload } unless topics.is_a?(Hash)

  topics.each do |(topic, topic_payload)|
    raise ArgumentError, "Topic name cannot be nil" if topic.nil?
    raise ArgumentError, "Topic name cannot be empty" if topic.empty?

    packet = MQTT::Packet::Publish.new(
      id: next_packet_id,
      qos:,
      retain:,
      topic:,
      payload: topic_payload
    )

    pending_acks << register_for_ack(packet) unless qos.zero?

    # Send the packet
    send_packet(packet)
  end

  return if qos.zero?

  pending_acks.each do |ack|
    wait_for_ack(ack)
  end
  nil
end

#queue_empty?Boolean

Returns true if the incoming message queue is empty.

Returns:

  • (Boolean)


482
483
484
# File 'lib/mqtt/client.rb', line 482

def queue_empty?
  @read_queue.empty?
end

#queue_lengthObject

Returns the length of the incoming message queue.



487
488
489
# File 'lib/mqtt/client.rb', line 487

def queue_length
  @read_queue.length
end

#set_will(topic, payload, retain: false, qos: 0) ⇒ Object

Set the Will for the client

The will is a message that will be delivered by the server when the client dies. The Will must be set before establishing a connection to the server



233
234
235
236
237
238
# File 'lib/mqtt/client.rb', line 233

def set_will(topic, payload, retain: false, qos: 0)
  self.will_topic = topic
  self.will_payload = payload
  self.will_retain = retain
  self.will_qos = qos
end

#ssl_contextObject

Get the OpenSSL context, that is used if SSL/TLS is enabled



197
198
199
# File 'lib/mqtt/client.rb', line 197

def ssl_context
  @ssl_context ||= OpenSSL::SSL::SSLContext.new
end

#subscribe(*topics, wait_for_ack: false) ⇒ Object

Send a subscribe message for one or more topics on the MQTT server. The topics parameter should be one of the following:

  • String: subscribe to one topic with QoS 0

  • Array: subscribe to multiple topics with QoS 0

  • Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level

For example:

client.subscribe( 'a/b' )
client.subscribe( 'a/b', 'c/d' )
client.subscribe( ['a/b',0], ['c/d',1] )
client.subscribe( { 'a/b' => 0, 'c/d' => 1 } )


421
422
423
424
425
426
427
428
429
430
431
# File 'lib/mqtt/client.rb', line 421

def subscribe(*topics, wait_for_ack: false)
  raise NotConnectedException unless connected?

  packet = MQTT::Packet::Subscribe.new(
    id: next_packet_id,
    topics:
  )
  token = register_for_ack(packet) if wait_for_ack
  send_packet(packet)
  wait_for_ack(token) if wait_for_ack
end

#unsubscribe(*topics, wait_for_ack: false) ⇒ Object

Send a unsubscribe message for one or more topics on the MQTT server



434
435
436
437
438
439
440
441
442
443
444
445
446
# File 'lib/mqtt/client.rb', line 434

def unsubscribe(*topics, wait_for_ack: false)
  raise NotConnectedException unless connected?

  topics = topics.first if topics.is_a?(Enumerable) && topics.one?

  packet = MQTT::Packet::Unsubscribe.new(
    topics:,
    id: next_packet_id
  )
  token = register_for_ack(packet) if wait_for_ack
  send_packet(packet)
  wait_for_ack(token) if wait_for_ack
end