Class: MQTT::SubHandler

Inherits:
BaseHandler show all
Defined in:
lib/mqtt/sub_handler.rb

Direct Known Subclasses

Testing::SubHandler

Instance Attribute Summary collapse

Subscribing collapse

Instance Method Summary collapse

Methods inherited from BaseHandler

#attempt_packet_publish, #destroy!, getTopicMatch, get_topic_split, #register_subscription, #unregister_subscription

Constructor Details

#initialize(mqttClient, jsonify: true, **extra_opts) ⇒ SubHandler

Returns a new instance of SubHandler.



140
141
142
143
144
# File 'lib/mqtt/sub_handler.rb', line 140

def initialize(mqttClient, jsonify: true, **extra_opts)
	super(mqttClient, **extra_opts);

	@jsonifyHashes = jsonify;
end

Instance Attribute Details

#jsonifyHashesObject

Whether or not hashes and arrays should be converted to JSON when sending



20
21
22
# File 'lib/mqtt/sub_handler.rb', line 20

def jsonifyHashes
  @jsonifyHashes
end

Instance Method Details

#lockAndListenObject Also known as: lock_and_listen

Pause the main thread and wait for messages. This is mainly useful when the code has set everything up, but doesn’t just want to end. “INT” is trapped, ensuring a smooth exit on Ctrl-C



130
131
132
133
134
135
136
137
# File 'lib/mqtt/sub_handler.rb', line 130

def lockAndListen()
	Signal.trap("INT") {
		exit 0
	}

	x_logi("Main thread paused.")
	Thread.stop();
end

#publish_to(topic, data, qos: 1, retain: false) ⇒ Object Also known as: publishTo

Publish a message to topic.

Parameters:

  • topic (String)

    The topic to push to.

  • data (String)

    The data to be transmitted.

  • qos (nil, Numeric) (defaults to: 1)

    QoS for the publish. Currently not fully supported by the mqtt gem.

  • retain (nil, Boolean) (defaults to: false)

    retain-flag for the publish.

Raises:

  • (ArgumentError)


110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/mqtt/sub_handler.rb', line 110

def publish_to(topic, data, qos: 1, retain: false)
	raise ArgumentError, "Wrong symbol in topic: #{topic}" if topic =~ /[#\+]/

	if(@jsonifyHashes and (data.is_a? Array or data.is_a? Hash))
		data = data.to_json
	end

	if(qos > 1)
		qos = 1
		x_logw("push with QOS > 1 was attempted, this is not supported yet!") unless $MQTTPubQOSWarned
		$MQTTPubQOSWarned = true;
	end

	queue_packet({type: :pub, topic: topic, data: data, qos: qos, retain: retain});
end

#subscribe_to(topic, qos: 1) {|data, topicList| ... } ⇒ MQTT::Subscriptions::CallbackSubscription Also known as: subscribeTo

Attach a callback to a MQTT Topic or wildcard. The callback will be saved, and asynchronously executed whenever a message from a matching topic (including wildcards) is received.

Parameters:

  • topic (String)

    The MQTT-Topic to subscribe to. Can be a Wildcard.

  • qos (nil, Integer) (defaults to: 1)

    The QoS for the subscription. Currently not used!

Yield Parameters:

  • data (String)

    The raw MQTT data received from the MQTT server

  • topicList (Array<String>)

    An array of topic-branches corresponding to wildcard matches. Can be empty if no wildcard was used!

Yield Returns:

  • (void)

Returns:



95
96
97
98
99
100
# File 'lib/mqtt/sub_handler.rb', line 95

def subscribe_to(topic, qos: 1, &callback)
	subObject = MQTT::Subscriptions::CallbackSubscription.new(topic, qos, callback);
	register_subscription(subObject);

	return subObject;
end

#track(topic, qos: 1) {|data| ... } ⇒ MQTT::Subscriptions::ValueTrackerSubscription Also known as: on_change

Track data changes for a topic in the background. With no callback given, the returned object can be used to get the last received raw data string. With a callback given, the callback will be called whenever a change in data is detected.

Parameters:

  • topic (String)

    The MQTT-Topic to track for data. Can be a Wildcard.

  • qos (nil, Integer) (defaults to: 1)

    The QoS to use for the subscription

Yield Parameters:

  • data (String)

    The new (changed) data received from MQTT.

Yield Returns:

  • (void)

Returns:



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/mqtt/sub_handler.rb', line 70

def track(topic, qos: 1, &callback)
	unless(@trackerHash.has_key? topic)
		subObject = MQTT::Subscriptions::ValueTrackerSubscription.new(topic, qos);
		register_subscription(subObject);

		@trackerHash[topic] = subObject;
	end

	@trackerHash[topic].attach(callback) if(callback)

	return @trackerHash[topic];
end

#wait_for(topic, qos: 1, timeout: nil) {|data, topicList| ... } ⇒ Boolean

Synchronously wait for data. It waits for a message on ‘topic`, optionally letting a block check the data for validity, and optionally aborting after a timeout

Parameters:

  • topic (String)

    The MQTT-Topic to wait for

  • timeout (nil, Integer) (defaults to: nil)

    The optional timeout after which to abort

  • qos (nil, Integer) (defaults to: 1)

    The QoS for this subscription

Yield Parameters:

  • data (String)

    The data received via MQTT

  • topicList (Array<String>)

    The wildcard topic branches matched.

Yield Returns:

  • (Boolean)

    Whether or not the data was sufficient, and capture should be stopped.

Returns:

  • (Boolean)

    True if the block returned true, False if the code timed-out



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/mqtt/sub_handler.rb', line 34

def wait_for(topic, qos: 1, timeout: nil)
	unless block_given?
		raise ArgumentError, "A block for data-processing needs to be passed!"
	end

	subObject = MQTT::Subscriptions::WaitpointSubscription.new(topic, qos);
	register_subscription(subObject);

	begin
	Timeout.timeout(timeout) do
		loop do
			return_data = subObject.waitpoint.wait()[1];
			if yield(return_data[0], return_data[1])
				return true;
			end
		end
	end
	rescue Timeout::Error
		return false;
	ensure
		unregister_subscription(subObject);
	end
end