Class: MQTT::BaseHandler

Inherits:
Object
  • Object
show all
Includes:
XasLogger::Mix
Defined in:
lib/mqtt/base_handler.rb

Direct Known Subclasses

SubHandler

Custom subscription handling collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(mqttClient, logger: nil, **extra_opts) ⇒ BaseHandler

Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.

Examples:

Starting the handler

mqtt = MQTT::SubHandler.new('mqtt.eclipse.org');
mqtt = MQTT::SubHandler.new(MQTT::Client.new("Your.Client.Opts"))

Parameters:

  • mqttClient (String, MQTT::Client)

    Either a URI to connect to, or a MQTT::Client The URI can be of the form “mqtts://Password@User:URL:port”. The MQTT client instance can be fully configured, as specified by the MQTT Gem. It must not already be connected!

  • jsonify (Boolean)

    Should Hashes and Arrays input into publish_to be converted to JSON? This can be useful to have one less .to_json call. Default is true.



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/mqtt/base_handler.rb', line 300

def initialize(mqttClient, logger: nil, **extra_opts)
	@callbackList = Array.new();
	if mqttClient.is_a? String
		@mqtt = MQTT::Client.new(mqttClient);
		@mqtt.clean_session = false unless extra_opts[:client_id].nil?
	else
		@mqtt = mqttClient;
	end

	init_x_log("MQTT #{@mqtt.host}", logger);
	self.log_level = Logger::INFO;

	@conChangeMutex = Mutex.new();
	@connected 		 = false;
	@reconnectCount = 0;

	@mqtt.client_id ||= extra_opts[:client_id] || MQTT::Client.generate_client_id("MQTT_Sub_", 8);

	@packetQueue = Array.new();
	@packetQueueMutex = Mutex.new();

	@publisherThreadWaiting = false;

	@subscribedTopics 	= Hash.new();

	@trackerHash = Hash.new();

	@listenerThread = Thread.new do
		ensure_clean_start();
		mqtt_resub_thread();
	end
	@listenerThread.abort_on_exception = true;

	begin
		Timeout.timeout(5) {
			until(@connected)
				sleep 0.1;
			end
		}
	rescue Timeout::Error
		x_loge("Broker did not connect!");
	end

	@publisherThread = Thread.new do
		mqtt_push_thread();
	end
	@publisherThread.abort_on_exception = true;

	at_exit {
		destroy!()
	}
end

Class Method Details

.get_topic_split(topicName) ⇒ Array<String>

Note:

This function is mainly used for background processing.

Split a Topic into a Topic-Array

Parameters:

  • topicName (String)

    The string topic which to split

Returns:

  • (Array<String>)

    A list of individual topic-branches



16
17
18
# File 'lib/mqtt/base_handler.rb', line 16

def self.get_topic_split(topicName)
	return topicName.scan(/[^\/]+/);
end

.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>

Note:

This function is mainly used for background processing.

Match a topic string to a topic pattern

Parameters:

  • receivedTopicString (String)

    The string (as returned by MQTT.get) to compare

  • topicPattern (Array<String>)

    The Topic-Array (as returned by .get_topic_split) to compare against

Returns:

  • (nil, Array<String>)

    Nil if no match was found. An Array of matched wildcard topic branches (can be empty) when successfully matched



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/mqtt/base_handler.rb', line 29

def self.getTopicMatch(receivedTopicString, topicPattern)
	receivedTopicList = get_topic_split receivedTopicString;

	outputTopicList = Array.new();

	return nil unless receivedTopicList.length >= topicPattern.length;

	topicPattern.each_index do |i|
		if(topicPattern[i] == "+")
			outputTopicList << receivedTopicList[i];

		elsif(topicPattern[i] == "#")
			outputTopicList.concat receivedTopicList[i..-1];
			return outputTopicList;

		elsif topicPattern[i] != receivedTopicList[i];
			return nil;

		end
	end

	return outputTopicList if topicPattern.length == receivedTopicList.length;
	return nil;
end

Instance Method Details

#attempt_packet_publishObject



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/mqtt/base_handler.rb', line 164

def attempt_packet_publish()
	until @packetQueue.empty? do
		h = nil;
		@packetQueueMutex.synchronize {
			h = @packetQueue[0];
		}
		Timeout.timeout(3) {
			if(h[:type] == :sub)
				@mqtt.subscribe(h[:topic] => h[:qos]);
			elsif(h[:type] == :pub)
				@mqtt.publish(h[:topic], h[:data], h[:retain], h[:qos]);
			end
		}
		@packetQueueMutex.synchronize {
			@packetQueue.shift();
		}
	end
end

#destroy!Object



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/mqtt/base_handler.rb', line 259

def destroy!()
	return if @destroying
	@destroying = true;

	unless @packetQueue.empty?
		x_logd "Finishing sending of MQTT messages ... "
		@publisherThread.run() if @publisherThreadWaiting
		begin
			Timeout.timeout(4) {
				until @packetQueue.empty? do
					sleep 0.05;
				end
			}
		rescue Timeout::Error
			x_logw "Not all messages were published";
		else
			x_logd "Publish clean finished"
		end
	end

	@publisherThread.run();
	@publisherThread.join();
	@listenerThread.kill();

	@mqtt.disconnect() if @connected

	ensure_clean_exit();

	x_logi("Fully disconnected!");
end

#register_subscription(subObject) ⇒ Object

Register a custom subscription, and send a subscription message to the server.

Parameters:

Returns:

  • void

Raises:

  • (ArgumentError)


113
114
115
116
117
118
119
# File 'lib/mqtt/base_handler.rb', line 113

def register_subscription(subObject)
	raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription
	return if @callbackList.include? subObject;

	@callbackList << subObject;
	queue_packet({type: :sub, topic: subObject.topic, qos: subObject.qos});
end

#unregister_subscription(subObject) ⇒ Object

Unregister a subscription. Removes it from the callback list and unsubscribes from the topic if no other subscriptions for it are present.

Parameters:

Returns:

  • void

Raises:

  • (ArgumentError)


102
103
104
105
106
107
108
# File 'lib/mqtt/base_handler.rb', line 102

def unregister_subscription(subObject)
	raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription
	return unless @callbackList.include? subObject;

	queue_packet({type: :unsub, topic: subObject.topic});
	@callbackList.delete(subObject);
end