Class: MQTT::SubHandler
- Inherits:
-
BaseHandler
- Object
- BaseHandler
- MQTT::SubHandler
- Defined in:
- lib/mqtt/sub_handler.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#jsonifyHashes ⇒ Object
Whether or not hashes and arrays should be converted to JSON when sending.
Subscribing collapse
-
#subscribe_to(topic, qos: 1) {|data, topicList| ... } ⇒ MQTT::Subscriptions::CallbackSubscription
(also: #subscribeTo)
Attach a callback to a MQTT Topic or wildcard.
-
#track(topic, qos: 1) {|data| ... } ⇒ MQTT::Subscriptions::ValueTrackerSubscription
(also: #on_change)
Track data changes for a topic in the background.
-
#wait_for(topic, qos: 1, timeout: nil) {|data, topicList| ... } ⇒ Boolean
Synchronously wait for data.
Instance Method Summary collapse
-
#initialize(mqttClient, jsonify: true, **extra_opts) ⇒ SubHandler
constructor
A new instance of SubHandler.
-
#lockAndListen ⇒ Object
(also: #lock_and_listen)
Pause the main thread and wait for messages.
-
#publish_to(topic, data, qos: 1, retain: false) ⇒ Object
(also: #publishTo)
Publish a message to topic.
Methods inherited from BaseHandler
#attempt_packet_publish, #destroy!, getTopicMatch, get_topic_split, #register_subscription, #unregister_subscription
Constructor Details
#initialize(mqttClient, jsonify: true, **extra_opts) ⇒ SubHandler
Returns a new instance of SubHandler.
140 141 142 143 144 |
# File 'lib/mqtt/sub_handler.rb', line 140 def initialize(mqttClient, jsonify: true, **extra_opts) super(mqttClient, **extra_opts); @jsonifyHashes = jsonify; end |
Instance Attribute Details
#jsonifyHashes ⇒ Object
Whether or not hashes and arrays should be converted to JSON when sending
20 21 22 |
# File 'lib/mqtt/sub_handler.rb', line 20 def jsonifyHashes @jsonifyHashes end |
Instance Method Details
#lockAndListen ⇒ Object Also known as: lock_and_listen
Pause the main thread and wait for messages. This is mainly useful when the code has set everything up, but doesn’t just want to end. “INT” is trapped, ensuring a smooth exit on Ctrl-C
130 131 132 133 134 135 136 137 |
# File 'lib/mqtt/sub_handler.rb', line 130 def lockAndListen() Signal.trap("INT") { exit 0 } x_logi("Main thread paused.") Thread.stop(); end |
#publish_to(topic, data, qos: 1, retain: false) ⇒ Object Also known as: publishTo
Publish a message to topic.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/mqtt/sub_handler.rb', line 110 def publish_to(topic, data, qos: 1, retain: false) raise ArgumentError, "Wrong symbol in topic: #{topic}" if topic =~ /[#\+]/ if(@jsonifyHashes and (data.is_a? Array or data.is_a? Hash)) data = data.to_json end if(qos > 1) qos = 1 x_logw("push with QOS > 1 was attempted, this is not supported yet!") unless $MQTTPubQOSWarned $MQTTPubQOSWarned = true; end queue_packet({type: :pub, topic: topic, data: data, qos: qos, retain: retain}); end |
#subscribe_to(topic, qos: 1) {|data, topicList| ... } ⇒ MQTT::Subscriptions::CallbackSubscription Also known as: subscribeTo
Attach a callback to a MQTT Topic or wildcard. The callback will be saved, and asynchronously executed whenever a message from a matching topic (including wildcards) is received.
95 96 97 98 99 100 |
# File 'lib/mqtt/sub_handler.rb', line 95 def subscribe_to(topic, qos: 1, &callback) subObject = MQTT::Subscriptions::CallbackSubscription.new(topic, qos, callback); register_subscription(subObject); return subObject; end |
#track(topic, qos: 1) {|data| ... } ⇒ MQTT::Subscriptions::ValueTrackerSubscription Also known as: on_change
Track data changes for a topic in the background. With no callback given, the returned object can be used to get the last received raw data string. With a callback given, the callback will be called whenever a change in data is detected.
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/mqtt/sub_handler.rb', line 70 def track(topic, qos: 1, &callback) unless(@trackerHash.has_key? topic) subObject = MQTT::Subscriptions::ValueTrackerSubscription.new(topic, qos); register_subscription(subObject); @trackerHash[topic] = subObject; end @trackerHash[topic].attach(callback) if(callback) return @trackerHash[topic]; end |
#wait_for(topic, qos: 1, timeout: nil) {|data, topicList| ... } ⇒ Boolean
Synchronously wait for data. It waits for a message on ‘topic`, optionally letting a block check the data for validity, and optionally aborting after a timeout
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/mqtt/sub_handler.rb', line 34 def wait_for(topic, qos: 1, timeout: nil) unless block_given? raise ArgumentError, "A block for data-processing needs to be passed!" end subObject = MQTT::Subscriptions::WaitpointSubscription.new(topic, qos); register_subscription(subObject); begin Timeout.timeout(timeout) do loop do return_data = subObject.waitpoint.wait()[1]; if yield(return_data[0], return_data[1]) return true; end end end rescue Timeout::Error return false; ensure unregister_subscription(subObject); end end |