Class: EventMachine::MQTT::ClientConnection
- Inherits:
-
Connection
- Object
- Connection
- Connection
- EventMachine::MQTT::ClientConnection
- Includes:
- Deferrable
- Defined in:
- lib/em/mqtt/client_connection.rb
Instance Attribute Summary collapse
-
#ack_timeout ⇒ Object
readonly
Returns the value of attribute ack_timeout.
-
#clean_session ⇒ Object
readonly
Returns the value of attribute clean_session.
-
#client_id ⇒ Object
readonly
Returns the value of attribute client_id.
-
#keep_alive ⇒ Object
readonly
Returns the value of attribute keep_alive.
-
#packet_id ⇒ Object
readonly
Returns the value of attribute packet_id.
-
#password ⇒ Object
readonly
Returns the value of attribute password.
-
#timer ⇒ Object
readonly
Returns the value of attribute timer.
-
#username ⇒ Object
readonly
Returns the value of attribute username.
Attributes inherited from Connection
#last_received, #last_sent, #state
Class Method Summary collapse
-
.connect(*args, &blk) ⇒ Object
Connect to an MQTT server.
Instance Method Summary collapse
- #connection_completed ⇒ Object
-
#disconnect(send_msg = true) ⇒ Object
Disconnect from the MQTT broker.
-
#initialize(args = {}) ⇒ ClientConnection
constructor
Initialize connection.
- #post_init ⇒ Object
-
#publish(topic, payload, retain = false, qos = 0) ⇒ Object
Publish a message on a particular topic to the MQTT broker.
- #receive_callback(&block) ⇒ Object
- #receive_msg(packet) ⇒ Object
-
#subscribe(*topics) ⇒ Object
Send a subscribe message for one or more topics on the MQTT broker.
- #unbind ⇒ Object
-
#unsubscribe(*topics) ⇒ Object
Send a unsubscribe message for one or more topics on the MQTT broker.
Methods inherited from Connection
#connected?, #receive_data, #send_packet
Constructor Details
#initialize(args = {}) ⇒ ClientConnection
Initialize connection
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/em/mqtt/client_connection.rb', line 50 def initialize(args={}) @client_id = MQTT::Client.generate_client_id @keep_alive = 10 @clean_session = true @packet_id = 0 @ack_timeout = 5 @username = nil @password = nil @timer = nil if args.is_a?(Hash) args.each_pair do |k,v| instance_variable_set("@#{k}", v) end end end |
Instance Attribute Details
#ack_timeout ⇒ Object (readonly)
Returns the value of attribute ack_timeout.
11 12 13 |
# File 'lib/em/mqtt/client_connection.rb', line 11 def ack_timeout @ack_timeout end |
#clean_session ⇒ Object (readonly)
Returns the value of attribute clean_session.
7 8 9 |
# File 'lib/em/mqtt/client_connection.rb', line 7 def clean_session @clean_session end |
#client_id ⇒ Object (readonly)
Returns the value of attribute client_id.
5 6 7 |
# File 'lib/em/mqtt/client_connection.rb', line 5 def client_id @client_id end |
#keep_alive ⇒ Object (readonly)
Returns the value of attribute keep_alive.
6 7 8 |
# File 'lib/em/mqtt/client_connection.rb', line 6 def keep_alive @keep_alive end |
#packet_id ⇒ Object (readonly)
Returns the value of attribute packet_id.
10 11 12 |
# File 'lib/em/mqtt/client_connection.rb', line 10 def packet_id @packet_id end |
#password ⇒ Object (readonly)
Returns the value of attribute password.
9 10 11 |
# File 'lib/em/mqtt/client_connection.rb', line 9 def password @password end |
#timer ⇒ Object (readonly)
Returns the value of attribute timer.
12 13 14 |
# File 'lib/em/mqtt/client_connection.rb', line 12 def timer @timer end |
#username ⇒ Object (readonly)
Returns the value of attribute username.
8 9 10 |
# File 'lib/em/mqtt/client_connection.rb', line 8 def username @username end |
Class Method Details
.connect(*args, &blk) ⇒ Object
Connect to an MQTT server
Examples:
ClientConnection.connect('localhost', 1883)
ClientConnection.connect(:host => 'localhost', :username => 'user', :password => 'pass')
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/em/mqtt/client_connection.rb', line 20 def self.connect(*args, &blk) hash = { :host => 'localhost', :port => MQTT::DEFAULT_PORT } i = 0 args.each do |arg| if arg.is_a?(Hash) hash.merge!(arg) else if i == 0 hash[:host] = arg elsif i == 1 hash[:port] = arg end i += 1 end end ::EventMachine.connect( hash.delete(:host), hash.delete(:port), self, hash, &blk ) end |
Instance Method Details
#connection_completed ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/em/mqtt/client_connection.rb', line 72 def connection_completed # TCP socket established: send Connect packet packet = MQTT::Packet::Connect.new( :client_id => @client_id, :clean_session => @clean_session, :keep_alive => @keep_alive, :username => @username, :password => @password ) send_packet(packet) @state = :connect_sent end |
#disconnect(send_msg = true) ⇒ Object
Disconnect from the MQTT broker. If you don’t want to say goodbye to the broker, set send_msg to false.
89 90 91 92 93 94 95 |
# File 'lib/em/mqtt/client_connection.rb', line 89 def disconnect(send_msg=true) # FIXME: only close if we aren't waiting for any acknowledgements if connected? send_packet(MQTT::Packet::Disconnect.new) if send_msg end @state = :disconnecting end |
#post_init ⇒ Object
67 68 69 70 |
# File 'lib/em/mqtt/client_connection.rb', line 67 def post_init super @state = :connecting end |
#publish(topic, payload, retain = false, qos = 0) ⇒ Object
Publish a message on a particular topic to the MQTT broker.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/em/mqtt/client_connection.rb', line 116 def publish(topic, payload, retain=false, qos=0) # Defer publishing until we are connected callback do send_packet( MQTT::Packet::Publish.new( :id => next_packet_id, :qos => qos, :retain => retain, :topic => topic, :payload => payload ) ) end end |
#receive_callback(&block) ⇒ Object
97 98 99 |
# File 'lib/em/mqtt/client_connection.rb', line 97 def receive_callback(&block) @receive_callback = block end |
#receive_msg(packet) ⇒ Object
101 102 103 104 |
# File 'lib/em/mqtt/client_connection.rb', line 101 def receive_msg(packet) # Alternatively, subclass this method @receive_callback.call(packet) unless @receive_callback.nil? end |
#subscribe(*topics) ⇒ Object
Send a subscribe message for one or more topics on the MQTT broker.
132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/em/mqtt/client_connection.rb', line 132 def subscribe(*topics) # Defer subscribing until we are connected callback do send_packet( MQTT::Packet::Subscribe.new( :id => next_packet_id, :topics => topics ) ) end end |
#unbind ⇒ Object
106 107 108 109 110 111 112 113 |
# File 'lib/em/mqtt/client_connection.rb', line 106 def unbind timer.cancel if timer unless state == :disconnecting # Re-throw any exceptions (if present) to avoid swallowed errors. raise $! || MQTT::NotConnectedException.new("Connection to server lost") end @state = :disconnected end |
#unsubscribe(*topics) ⇒ Object
Send a unsubscribe message for one or more topics on the MQTT broker
145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/em/mqtt/client_connection.rb', line 145 def unsubscribe(*topics) # Defer unsubscribing until we are connected callback do send_packet( MQTT::Packet::Unsubscribe.new( :id => next_packet_id, :topics => topics ) ) end end |