Class: MQTT::BaseHandler
- Inherits:
-
Object
- Object
- MQTT::BaseHandler
- Includes:
- XasLogger::Mix
- Defined in:
- lib/mqtt/base_handler.rb
Direct Known Subclasses
Custom subscription handling collapse
-
#register_subscription(subObject) ⇒ Object
Register a custom subscription, and send a subscription message to the server.
-
#unregister_subscription(subObject) ⇒ Object
Unregister a subscription.
Class Method Summary collapse
-
.get_topic_split(topicName) ⇒ Array<String>
Split a Topic into a Topic-Array.
-
.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>
Match a topic string to a topic pattern.
Instance Method Summary collapse
- #attempt_packet_publish ⇒ Object
- #destroy! ⇒ Object
-
#initialize(mqttClient, logger: nil, **extra_opts) ⇒ BaseHandler
constructor
Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.
Constructor Details
#initialize(mqttClient, logger: nil, **extra_opts) ⇒ BaseHandler
Initialize a new MQTT::SubHandler The handler immediately connects to the server, and begins receciving and sending.
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/mqtt/base_handler.rb', line 300 def initialize(mqttClient, logger: nil, **extra_opts) @callbackList = Array.new(); if mqttClient.is_a? String @mqtt = MQTT::Client.new(mqttClient); @mqtt.clean_session = false unless extra_opts[:client_id].nil? else @mqtt = mqttClient; end init_x_log("MQTT #{@mqtt.host}", logger); self.log_level = Logger::INFO; @conChangeMutex = Mutex.new(); @connected = false; @reconnectCount = 0; @mqtt.client_id ||= extra_opts[:client_id] || MQTT::Client.generate_client_id("MQTT_Sub_", 8); @packetQueue = Array.new(); @packetQueueMutex = Mutex.new(); @publisherThreadWaiting = false; @subscribedTopics = Hash.new(); @trackerHash = Hash.new(); @listenerThread = Thread.new do ensure_clean_start(); mqtt_resub_thread(); end @listenerThread.abort_on_exception = true; begin Timeout.timeout(5) { until(@connected) sleep 0.1; end } rescue Timeout::Error x_loge("Broker did not connect!"); end @publisherThread = Thread.new do mqtt_push_thread(); end @publisherThread.abort_on_exception = true; at_exit { destroy!() } end |
Class Method Details
.get_topic_split(topicName) ⇒ Array<String>
This function is mainly used for background processing.
Split a Topic into a Topic-Array
16 17 18 |
# File 'lib/mqtt/base_handler.rb', line 16 def self.get_topic_split(topicName) return topicName.scan(/[^\/]+/); end |
.getTopicMatch(receivedTopicString, topicPattern) ⇒ nil, Array<String>
This function is mainly used for background processing.
Match a topic string to a topic pattern
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/mqtt/base_handler.rb', line 29 def self.getTopicMatch(receivedTopicString, topicPattern) receivedTopicList = get_topic_split receivedTopicString; outputTopicList = Array.new(); return nil unless receivedTopicList.length >= topicPattern.length; topicPattern.each_index do |i| if(topicPattern[i] == "+") outputTopicList << receivedTopicList[i]; elsif(topicPattern[i] == "#") outputTopicList.concat receivedTopicList[i..-1]; return outputTopicList; elsif topicPattern[i] != receivedTopicList[i]; return nil; end end return outputTopicList if topicPattern.length == receivedTopicList.length; return nil; end |
Instance Method Details
#attempt_packet_publish ⇒ Object
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/mqtt/base_handler.rb', line 164 def attempt_packet_publish() until @packetQueue.empty? do h = nil; @packetQueueMutex.synchronize { h = @packetQueue[0]; } Timeout.timeout(3) { if(h[:type] == :sub) @mqtt.subscribe(h[:topic] => h[:qos]); elsif(h[:type] == :pub) @mqtt.publish(h[:topic], h[:data], h[:retain], h[:qos]); end } @packetQueueMutex.synchronize { @packetQueue.shift(); } end end |
#destroy! ⇒ Object
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/mqtt/base_handler.rb', line 259 def destroy!() return if @destroying @destroying = true; unless @packetQueue.empty? x_logd "Finishing sending of MQTT messages ... " @publisherThread.run() if @publisherThreadWaiting begin Timeout.timeout(4) { until @packetQueue.empty? do sleep 0.05; end } rescue Timeout::Error x_logw "Not all messages were published"; else x_logd "Publish clean finished" end end @publisherThread.run(); @publisherThread.join(); @listenerThread.kill(); @mqtt.disconnect() if @connected ensure_clean_exit(); x_logi("Fully disconnected!"); end |
#register_subscription(subObject) ⇒ Object
Register a custom subscription, and send a subscription message to the server.
113 114 115 116 117 118 119 |
# File 'lib/mqtt/base_handler.rb', line 113 def register_subscription(subObject) raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription return if @callbackList.include? subObject; @callbackList << subObject; queue_packet({type: :sub, topic: subObject.topic, qos: subObject.qos}); end |
#unregister_subscription(subObject) ⇒ Object
Unregister a subscription. Removes it from the callback list and unsubscribes from the topic if no other subscriptions for it are present.
102 103 104 105 106 107 108 |
# File 'lib/mqtt/base_handler.rb', line 102 def unregister_subscription(subObject) raise ArgumentError, "Object is not a subscription!" unless subObject.is_a? MQTT::Subscriptions::Subscription return unless @callbackList.include? subObject; queue_packet({type: :unsub, topic: subObject.topic}); @callbackList.delete(subObject); end |