Class: Paho::Proxy

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/paho/proxy.rb

Constant Summary collapse

QOS =
2
LOCAL_MQTT =
'tcp://localhost:1883'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeProxy

Returns a new instance of Proxy


34
35
36
# File 'lib/paho/proxy.rb', line 34

def initialize
  @clients = []
end

Instance Attribute Details

#clientsObject (readonly)

Returns the value of attribute clients


14
15
16
# File 'lib/paho/proxy.rb', line 14

def clients
  @clients
end

#processorObject

Returns the value of attribute processor


12
13
14
# File 'lib/paho/proxy.rb', line 12

def processor
  @processor
end

#qosObject

Returns the value of attribute qos


11
12
13
# File 'lib/paho/proxy.rb', line 11

def qos
  @qos
end

#topicObject

Returns the value of attribute topic


10
11
12
# File 'lib/paho/proxy.rb', line 10

def topic
  @topic
end

Class Method Details

.disconnect!Object


29
30
31
# File 'lib/paho/proxy.rb', line 29

def disconnect!
  self.instance.disconnect!
end

.publish(topic, payload) ⇒ Object


25
26
27
# File 'lib/paho/proxy.rb', line 25

def publish(topic, payload)
  self.instance.publish(topic, payload)
end

.publisherObject


21
22
23
# File 'lib/paho/proxy.rb', line 21

def publisher
  self.instance
end

.subscribe(processor, opts = {}) ⇒ Object


17
18
19
# File 'lib/paho/proxy.rb', line 17

def subscribe(processor, opts = {})
  self.instance.subscribe(processor, opts)
end

Instance Method Details

#clientObject


90
91
92
# File 'lib/paho/proxy.rb', line 90

def client
  @client ||= Paho::MqttClient.new("tcp://localhost:1883", id)
end

#connect!(opts = {}) ⇒ Object


38
39
40
41
42
43
44
# File 'lib/paho/proxy.rb', line 38

def connect!(opts = {})
  if client.isConnected
    true
  else
    client.connect(connection_options(opts))
  end
end

#disconnect!Object


94
95
96
97
# File 'lib/paho/proxy.rb', line 94

def disconnect!
  client.disconnect if client.isConnected
  @clients.each { |c| c.disconnect if c.isConnected }
end

#publish(topic, payload) ⇒ Object

Publish an MQTT message on a topic

Parameters:

  • topic

    String The topic to publish to

  • payload

    String The contents of the message as a plain string


77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/paho/proxy.rb', line 77

def publish(topic, payload)
  connect!

  message = Paho::MqttMessage.new
  message.setQos 2
  message.setPayload payload.to_s.to_java_bytes
  message.setMutable false

  log "Publishing to #{topic} #{payload}"
  client.publish topic, message
  log "Published to #{topic} #{payload}"
end

#subscribe(processor, opts = {}) ⇒ Object

Subscribe to a topic.

The required parameter has to respond to the messageArrived method.

Parameters:

  • processor

    Paho::Processor An object that responds to the `topic` and `messageArrived` methods


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/paho/proxy.rb', line 51

def subscribe(processor, opts = {})
  topic = processor.topic
  client_id = id(topic)

  qos = opts.delete(:qos) { QOS }
  uri = opts.delete(:uri) { LOCAL_MQTT }

  log "Creating new subscription for #{topic} with id #{client_id}"

  c = Paho::MqttClient.new(uri, client_id, nil)
  c.setCallback(processor)
  c.connect(connection_options(opts))
  c.subscribe(topic, qos)

  log "Created new subscription for #{topic} with id #{client_id}"
  @clients << c

  true
rescue Java::OrgEclipsePahoClientMqttv3::MqttException => e
  fail ArgumentError, e.message
end