Class: MQTT::Proxy
- Inherits:
-
Object
- Object
- MQTT::Proxy
- Defined in:
- lib/mqtt/proxy.rb
Overview
Class for implementing a proxy to filter/mangle MQTT packets.
Instance Attribute Summary collapse
-
#client_filter ⇒ Object
writeonly
A filter Proc for packets coming from the client (to the server).
-
#local_host ⇒ Object
readonly
Address to bind listening socket to.
-
#local_port ⇒ Object
readonly
Port to bind listening socket to.
-
#logger ⇒ Object
readonly
Ruby Logger object to send informational messages to.
-
#select_timeout ⇒ Object
readonly
Time in seconds before disconnecting an idle connection.
-
#server_filter ⇒ Object
writeonly
A filter Proc for packets coming from the server (to the client).
-
#server_host ⇒ Object
readonly
Address of upstream server to send packets upstream to.
-
#server_port ⇒ Object
readonly
Port of upstream server to send packets upstream to.
Instance Method Summary collapse
-
#initialize(args = {}) ⇒ Proxy
constructor
Create a new MQTT Proxy instance.
-
#run ⇒ Object
Start accepting connections and processing packets.
Constructor Details
#initialize(args = {}) ⇒ Proxy
Create a new MQTT Proxy instance.
Possible argument keys:
:local_host Address to bind listening socket to.
:local_port Port to bind listening socket to.
:server_host Address of upstream server to send packets upstream to.
:server_port Port of upstream server to send packets upstream to.
:select_timeout Time in seconds before disconnecting a connection.
:logger Ruby Logger object to send informational messages to.
NOTE: be careful not to connect to yourself!
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/mqtt/proxy.rb', line 40 def initialize(args = {}) @local_host = args[:local_host] || '0.0.0.0' @local_port = args[:local_port] || MQTT::DEFAULT_PORT @server_host = args[:server_host] @server_port = args[:server_port] || 18_830 @select_timeout = args[:select_timeout] || 60 # Setup a logger @logger = args[:logger] if @logger.nil? @logger = Logger.new(STDOUT) @logger.level = Logger::INFO end # Default is not to have any filters @client_filter = nil @server_filter = nil # Create TCP server socket @server = TCPServer.open(@local_host, @local_port) @logger.info "MQTT::Proxy listening on #{@local_host}:#{@local_port}" end |
Instance Attribute Details
#client_filter=(value) ⇒ Object (writeonly)
A filter Proc for packets coming from the client (to the server).
23 24 25 |
# File 'lib/mqtt/proxy.rb', line 23 def client_filter=(value) @client_filter = value end |
#local_host ⇒ Object (readonly)
Address to bind listening socket to
5 6 7 |
# File 'lib/mqtt/proxy.rb', line 5 def local_host @local_host end |
#local_port ⇒ Object (readonly)
Port to bind listening socket to
8 9 10 |
# File 'lib/mqtt/proxy.rb', line 8 def local_port @local_port end |
#logger ⇒ Object (readonly)
Ruby Logger object to send informational messages to
20 21 22 |
# File 'lib/mqtt/proxy.rb', line 20 def logger @logger end |
#select_timeout ⇒ Object (readonly)
Time in seconds before disconnecting an idle connection
17 18 19 |
# File 'lib/mqtt/proxy.rb', line 17 def select_timeout @select_timeout end |
#server_filter=(value) ⇒ Object (writeonly)
A filter Proc for packets coming from the server (to the client).
26 27 28 |
# File 'lib/mqtt/proxy.rb', line 26 def server_filter=(value) @server_filter = value end |
#server_host ⇒ Object (readonly)
Address of upstream server to send packets upstream to
11 12 13 |
# File 'lib/mqtt/proxy.rb', line 11 def server_host @server_host end |
#server_port ⇒ Object (readonly)
Port of upstream server to send packets upstream to.
14 15 16 |
# File 'lib/mqtt/proxy.rb', line 14 def server_port @server_port end |
Instance Method Details
#run ⇒ Object
Start accepting connections and processing packets.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/mqtt/proxy.rb', line 64 def run loop do # Wait for a client to connect and then create a thread for it Thread.new(@server.accept) do |client_socket| logger.info "Accepted client: #{client_socket.peeraddr.join(':')}" server_socket = TCPSocket.new(@server_host, @server_port) begin process_packets(client_socket, server_socket) rescue Exception => exp logger.error exp.to_s end logger.info "Disconnected: #{client_socket.peeraddr.join(':')}" server_socket.close client_socket.close end end end |