Class: RelpServer
Constant Summary
Constants inherited from Relp
Relp::RelpSoftware, Relp::RelpVersion
Instance Method Summary collapse
- #accept ⇒ Object
- #ack(socket, txnr) ⇒ Object
-
#initialize(host, port, required_commands = []) ⇒ RelpServer
constructor
A new instance of RelpServer.
- #serverclose(socket) ⇒ Object
- #shutdown ⇒ Object
-
#syslog_read(socket) ⇒ Object
This does not ack the frame, just reads it.
Methods inherited from Relp
#frame_read, #frame_write, #server?, #valid_command?
Constructor Details
#initialize(host, port, required_commands = []) ⇒ RelpServer
Returns a new instance of RelpServer.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/logstash/util/relp.rb', line 100 def initialize(host,port,required_commands=[]) @logger = Cabin::Channel.get(LogStash) @server=true #These are things that are part of the basic protocol, but only valid in one direction (rsp, close etc.) @basic_relp_commands = ['close']#TODO: check for others #These are extra commands that we require, otherwise refuse the connection @required_relp_commands = required_commands begin @server = TCPServer.new(host, port) rescue Errno::EADDRINUSE @logger.error("Could not start RELP server: Address in use", :host => host, :port => port) raise end @logger.info? and @logger.info("Started RELP Server", :host => host, :port => port) end |
Instance Method Details
#accept ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/logstash/util/relp.rb', line 121 def accept socket = @server.accept frame=self.frame_read(socket) if frame['command'] == 'open' offer=Hash[*frame['message'].scan(/^(.*)=(.*)$/).flatten] if offer['relp_version'].nil? @logger.warn("No relp version specified") #if no version specified, relp spec says we must close connection self.serverclose(socket) raise RelpError, 'No relp_version specified' #subtracting one array from the other checks to see if all elements in @required_relp_commands are present in the offer elsif ! (@required_relp_commands - offer['commands'].split(',')).empty? @logger.warn("Not all required commands are available", :required => @required_relp_commands, :offer => offer['commands']) #Tell them why we're closing the connection: response_frame = Hash.new response_frame['txnr'] = frame['txnr'] response_frame['command'] = 'rsp' response_frame['message'] = '500 Required command(s) ' + (@required_relp_commands - offer['commands'].split(',')).join(',') + ' not offered' self.frame_write(socket,response_frame) self.serverclose(socket) raise InsufficientCommands, offer['commands'] + ' offered, require ' + @required_relp_commands.join(',') else #attempt to set up connection response_frame = Hash.new response_frame['txnr'] = frame['txnr'] response_frame['command'] = 'rsp' response_frame['message'] = '200 OK ' response_frame['message'] += 'relp_version=' + RelpVersion + "\n" response_frame['message'] += 'relp_software=' + RelpSoftware + "\n" response_frame['message'] += 'commands=' + @required_relp_commands.join(',')#TODO: optional ones self.frame_write(socket, response_frame) return self, socket end else self.serverclose(socket) raise InappropriateCommand, frame['command'] + ' expecting open' end end |
#ack(socket, txnr) ⇒ Object
200 201 202 203 204 205 206 |
# File 'lib/logstash/util/relp.rb', line 200 def ack(socket, txnr) frame = Hash.new frame['txnr'] = txnr frame['command'] = 'rsp' frame['message'] = '200 OK' self.frame_write(socket, frame) end |
#serverclose(socket) ⇒ Object
184 185 186 187 188 189 190 191 192 193 |
# File 'lib/logstash/util/relp.rb', line 184 def serverclose(socket) frame = Hash.new frame['txnr'] = 0 frame['command'] = 'serverclose' begin self.frame_write(socket,frame) socket.close rescue ConnectionClosed end end |
#shutdown ⇒ Object
195 196 197 198 |
# File 'lib/logstash/util/relp.rb', line 195 def shutdown @server.close rescue Exception#@server might already be down end |
#syslog_read(socket) ⇒ Object
This does not ack the frame, just reads it
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/logstash/util/relp.rb', line 165 def syslog_read(socket) frame = self.frame_read(socket) if frame['command'] == 'syslog' return frame elsif frame['command'] == 'close' #the client is closing the connection, acknowledge the close and act on it response_frame = Hash.new response_frame['txnr'] = frame['txnr'] response_frame['command'] = 'rsp' self.frame_write(socket,response_frame) self.serverclose(socket) raise ConnectionClosed else #the client is trying to do something unexpected self.serverclose(socket) raise InappropriateCommand, frame['command'] + ' expecting syslog' end end |